load control system (7 of 8)

Keith Muller muller at sdcc3.UUCP
Wed Feb 13 05:05:03 AEST 1985


This is part 7 of the load control system. Part 1 must be unpacked before any
other part.
	Keith Muller
	ucbvax!sdcsvax!muller


# This is a shell archive.  Remove anything before this line,
# then unpack it by saving it in a file and typing "sh file".
#
# Wrapped by sdcc3!muller on Sat Feb  9 13:58:16 PST 1985
# Contents:  server/commands.c
 
echo x - server/commands.c
sed 's/^@//' > "server/commands.c" <<'@//E*O*F server/commands.c//'

/*------------------------------------------------------------------------
 * commands.c - server
 *
 * Commands that can be executed by the server in response to client
 * datagrams
 *------------------------------------------------------------------------
 */

/* $Log$ */

#include "../h/common.h"
#include "../h/server.h"
#include <sys/file.h>
#include <sys/ioctl.h>
#include <sys/stat.h>
#include <sys/uio.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/time.h>
#include <stdio.h>
#include <signal.h>
#include <errno.h>

/*-----------------------------------------------------------------------
 * cntrldis
 *
 * cntrldis reads the datagram on control socket port. Then call the
 * appropriate routine as encoded in the datagrams type field. 
 * NOTE:
 *      The control program that sent the datagram ALWAYS waits for an
 *      indication that the datagram was processed. Each routine is 
 *      required to send an indication to the control program that the
 *      request was processed.
 *-----------------------------------------------------------------------
 */

cntrldis()
{
	struct request work;	/* datagram space */
	int oldmask;		/* old value of signal mask  */
	int fromlen = 0;
	extern int cntrlsock;
	extern int alrmmask;
	extern int newstatus;
#ifdef sun
	extern long loadlevel;
#else
	extern double loadlevel;
#endif sun
	extern int full;
	extern int errno;
	extern u_long mqtime;
	extern struct qnode *qhead;

	/*
	 * BLOCK OFF SIGALRM as the called routines modify the 
	 * internal data structures and cannot be interrupted
	 * by the interval timer. That would corrupt the linked
	 * lists.
	 */
	oldmask = sigblock(alrmmask);

	if (recvfrom(cntrlsock,&work,sizeof(struct request),0,(struct sockaddr *)0,&fromlen) <= 0){
		if (errno != EINTR)
			errlog("error in cntrldis recv");
		(void)sigsetmask(oldmask);
		return;
	}

	/*
	 * dispatch on type of request
	 */
	switch(work.type){
		case RJOBCMD:
			/*
			 * run a job by pid in the queue
			 * (privledged command)
			 */
			runjob(&work);
			break;
		case RUSRCMD:
			/*
			 * remove all a users jobs in the queue
			 * (privledged command)
			 */
			runusr(&work);
			break;
		case PJOBCMD:
			/*
			 * a request to remove a job from the queue
			 * (also can be called from msgdis())
			 */
			prjob(&work, cntrlsock);
			break;
		case PUSRCMD:
			/*
			 * remove all a users jobs in the queue
			 */
			prusr(&work);
			break;
		case PALLCMD:
			/*
			 * purge the ENTIRE queue
			 * (privledged command)
			 */
			prall(&work);
			break;
		case ABORTCMD:
			/*
	 		 * make sure socket is owned by root, otherwise
	 		 * reject it
	 		 */
			if (chksock(CNTRLPRE, work.pid, 0) == 0){
				(void)outcntrl(work.pid, STOPCMD);
				return;
			}

			/*
			 * tell the server to terminate
			 * (privledged command)
			 */
			(void)outcntrl(work.pid, RUNCMD);
			cleanup();
			break;
		case MOVECMD:
			/*
	 		 * make sure socket is owned by root, otherwise
	 		 * reject it
	 		 */
			if (chksock(CNTRLPRE, work.pid, 0) == 0){
				(void)outcntrl(work.pid, STOPCMD);
				return;
			}

			/*
			 * move a process in the queue
			 * (privledged command)
			 */
			if (movequeue(work.time, work.uid) == 0)
				(void)outcntrl(work.pid, RUNCMD);
			else
				(void)outcntrl(work.pid, STOPCMD);
			break;
		case LOADLIMCMD:
			/*
	 		 * make sure socket is owned by root, otherwise
	 		 * reject it
	 		 */
			if (chksock(CNTRLPRE, work.pid, 0) == 0){
				(void)outcntrl(work.pid, STOPCMD);
				return;
			}

			/*
			 * change the load level queueing starts
			 * (privledged command)
			 */
#ifdef sun
			loadlevel = (long)work.time;
#else
			loadlevel = ((double)work.time)/256.0;
#endif sun
			newstatus = 1;
			(void)outcntrl(work.pid, RUNCMD);
			break;
		case STATUSCMD:
			/*
			 * update the status file if necessary
			 */
			status(&work);
			break;
		case LISTCMD:
			/*
			 * update the queue list file if necessary 
			 */
			list(&work);
			break;
		case MQTIMECMD:
			/*
	 		 * make sure socket is owned by root, otherwise
	 		 * reject it
	 		 */
			if (chksock(CNTRLPRE, work.pid, 0) == 0){
				(void)outcntrl(work.pid, STOPCMD);
				return;
			}

			/*
			 * change the maximium time a job can wait
			 * (privledged command)
			 */
			mqtime = work.time;
			newstatus = 1;
			(void)outcntrl(work.pid, RUNCMD);
			break;
		case QUEUESIZE:
			/*
	 		 * make sure socket is owned by root, otherwise
	 		 * reject it
	 		 */
			if (chksock(CNTRLPRE, work.pid, 0) == 0){
				(void)outcntrl(work.pid, STOPCMD);
				return;
			}

			/*
			 * change the maximium size limit on the
			 * queue of waiting jobs
			 * (privledged command)
			 */
			full = (int)work.time;
			newstatus = 1;
			(void)outcntrl(work.pid, RUNCMD);
			break;
		case CHTIMER:
			/*
			 * change interval when load level checked 
			 * (privledged command)
			 */
			chtimer(&work);
			break;
		default:
			errno = 0;
			errlog("cntrldis bad command");
			(void)outcntrl(work.pid, STOPCMD);
			break;
	}

	/*
	 * UNBLOCK SIGALRM so interval timer can check load
	 * to dispatch a job.
	 */

	(void)sigsetmask(oldmask);
}

/*-----------------------------------------------------------------------
 * msgdis
 *
 * msgdis reads the datagram on msg socket port. Then calls the
 * appropriate routine as encoded in the datagrams type field. 
 * NOTE:
 *      The client that sent the datagram ALWAYS waits for an indication
 *      that the datagram was processed. Each routine is required to send
 *      an indication to the client that the request has been processed.
 *-----------------------------------------------------------------------
 */

msgdis()
{
	struct request work;	/* datagram space */
	int oldmask;		/* old value of signal mask  */
	int fromlen = 0;
	extern int msgsock;
	extern int alrmmask;
	extern int errno;

	/*
	 * BLOCK OFF SIGALRM as the called routines modify the 
	 * internal data structures and cannot be interrupted
	 * by the iterval timer. That would corrupt the linked
	 * lists.
	 */
	oldmask = sigblock(alrmmask);

	if (recvfrom(msgsock,&work,sizeof(struct request),0,(struct sockaddr *)0,&fromlen) <= 0){
		if (errno != EINTR)
			errlog("error in msgdis recv");
		(void)sigsetmask(oldmask);
		return;
	}

	/*
	 * dispatch on type of request
	 */
	if (work.type == POLLCMD){
		/*
		 * a client making sure he is in the queue
		 * same as a QCMD, but addjob handles them differently.
		 */
		addjob(&work);
	}else if (work.type == QCMD){
		/*
		 * a request to queue a process
		 */
		addjob(&work);
	}else if (work.type == PJOBCMD){
		/*
		 * a request to remove a job from the queue
		 * should only be from a terminating client
		 * (also called from cntrldis())
		 */
		prjob(&work, msgsock);
	}else{
		errno = 0;
		errlog("msgdis bad command");
		(void)outmsg(work.pid, STOPCMD);
	}

	/*
	 * UNBLOCK SIGALRM so interval timer can check load
	 * to dispatch a job.
	 */
	(void)sigsetmask(oldmask);
}

/*-------------------------------------------------------------------------
 * addjob 
 *
 * check a job request to be queued. the request is in datagram work.
 * jobs are only added if the load is above the set loadlimit threshold,
 * otherwise they are told to run.
 * If the queue is full, then the job is rejected.
 *-------------------------------------------------------------------------
 */

addjob(work)
struct request *work;
{
	struct itimerval oldalrm;
	extern int full;
	extern int qcount;
	extern struct itimerval startalrm;
	extern int addqueue();
	extern int timerstop;
	extern struct qnode *qhead;

	/*
	 * if the queue is empty and the load is below the
	 * limit, just run the job.
	 */
	if ((qcount == 0) && (getrun() > 0)){
		(void)outmsg(work->pid, RUNCMD);
		return;
	}

	switch (addqueue(work)){
		case 0:
			/*
		 	 * queue was empty, turn the timer back on
		 	 */
			if (setitimer(ITIMER_REAL,&startalrm, &oldalrm)<0){
				errlog("start timer error");
				exit(1);
			}
			timerstop = 0;
			/*
			 * fall through to case 1 below, and send queued
			 * message
			 */
		case 1:
			/*
			 * job is in queue, all is ok
			 */
			 (void)outmsg(work->pid, QCMD);
			 break;
		case -1:
			/*
		 	 * addqueue failed see if we can free up a space by
		 	 * telling oldest job to run.
		 	 */
			if (qcount > 0){
				(void)outmsg(qhead->pid, RUNCMD);
				(void)rmqueue(qhead);
				(void)addqueue(work);
				(void)outmsg(work->pid, QCMD);
			}else{
				(void)outmsg(work->pid, RUNCMD);
			}
			break;
		case -2:
			/*
			 * this is a new job and the queue is full.
			 * Reject the job.
			 */
			(void)outmsg(work->pid, FULLQUEUE);
			break;
		default:
			/*
			 * bad return from addqueue()
			 */
			errlog("addqueue returned bad value");
			exit(1);
			break;
	}
}

/*-------------------------------------------------------------------------
 * chksock
 *
 * make sure that the bound socket is owned by the proper person. This is only
 * checked for control messages (which are very infrequent).
 *-------------------------------------------------------------------------
 */

chksock(prefix, jpid, juid)
char *prefix;
u_long jpid;
int juid;
{
	char namebuf[64];
	struct stat statbuf;
	extern char *sprintf();

	(void)sprintf(namebuf, "%s%u", prefix, jpid);
	if (stat(namebuf, &statbuf) != 0)
		return(0);
	if ((unsigned int)statbuf.st_uid != juid)
		return(0);
	return(1);
}

/*-------------------------------------------------------------------------
 * chtimer
 *
 * change the interval timer. The interval timer is used to force the server
 * to check the queue every n seconds to see if the load is low enough to
 * let some jobs run.
 *-------------------------------------------------------------------------
 */

chtimer(work)
struct request *work;
{
	struct itimerval oldalrm;
	extern struct itimerval startalrm;
	extern int timerstop;
	extern int newstatus;
	extern int outmsg();

	/*
	 * make sure that this is from a socket owned by root, otherwise
	 * reject it
	 */
	if (chksock(CNTRLPRE, work->pid, 0) == 0){
		(void)outcntrl(work->pid, STOPCMD);
		return;
	}

	startalrm.it_interval.tv_sec = work->time;
	startalrm.it_value.tv_sec = work->time;
	newstatus = 1;

	/*
	 * if the timer is already stopped, just leave.
	 */
	if (timerstop == 1){
		(void)outcntrl(work->pid, RUNCMD);
		return;
	}

	/*
	 * restart the timer with the new interval
	 */
	if (setitimer(ITIMER_REAL,&startalrm, &oldalrm) < 0){
		errlog("start timer error");
		(void)outcntrl(work->pid, STOPCMD);
		cleanup();
	}

	/*
	 * tell the client the command was sucessful
	 */
	(void)outcntrl(work->pid, RUNCMD);
}

/*------------------------------------------------------------------------
 * list
 *
 * if necessary update the list file with the current queue status
 * then tell the client that the list file is up to date.
 * The data is stored in a file to avoid any chance the server could block
 * writing to a stopped control program.
 *
 * NOTE:
 * The users uids are NOT looked up it the passwd file. That must be done
 * by the programs that read the list file. Looking things up in the passwd
 * file are real expensive (even with dbm hashing) and this cannot be
 * afforded.
 *------------------------------------------------------------------------
 */
list(work)
struct request *work;
{
	register struct qnode *ptr;	/* pointer to walk through queue */
	FILE *out;			/* file where list will be written */
	extern int newlist;
	extern int qcount;
	extern struct qnode *qhead;
	extern FILE *fopen();

	/*
	 * if the queue is the same as the last time it was listed in
	 * the file, just tell the client to read the file.
	 */
	if (newlist == 0){
		(void)outcntrl(work->pid, RUNCMD);
		return;
	}

	if ((out = fopen(LISTFILE, "w")) == NULL){
		errlog("list cannot open LISTFILE");
		(void)outcntrl(work->pid, STOPCMD);
		return;
	}

	/*
	 * write out the number of waiting clients
	 */
	fprintf(out, "%d\n", qcount);

	/*
	 * write each queue entry
	 */
	for (ptr = qhead; ptr != QNIL; ptr = ptr->fow)
		fprintf(out,"%u %u %u %s\n",ptr->uid,ptr->pid,ptr->time,ptr->com);
	(void)fclose(out);

	/*
	 * set the flag to indicate that the list was updated.
	 */
	newlist = 0;

	/*
	 * tell the client to read the file
	 */
	(void)outcntrl(work->pid, RUNCMD);
}

/*-----------------------------------------------------------------------
 * outcntrl
 *
 * send the indicated message to the waiting control program who pid is
 * "pid". control sockets are always the CNTRLPRE followed by the pid.
 *-----------------------------------------------------------------------
 */
outcntrl(pid, cmd)
u_long pid;
char cmd;
{
	int len;                 /* the size of the datagram header */
	struct sockaddr_un name; /* datagram recipient */
	extern int cntrlsock;
	extern char *sprintf();
	extern int errno;

	/*
	 * set up the address of the target of the message
	 */
	name.sun_family = AF_UNIX;
	(void)sprintf(name.sun_path, "%s%u", CNTRLPRE, pid);
	len = strlen(name.sun_path) + sizeof(name.sun_family) + 1;

	if (sendto(cntrlsock, &cmd, sizeof(cmd), 0, &name, len) >= 0)
		return(0);

	/*
	 * If this point is reached:
	 *
	 * The sendto FAILED, either control died and left the old socket
	 * entry in the filesystem (so remove it) or terminated and
	 * cleaned up the old socket entry.
	 */
	if ((errno == ENOTSOCK) || (errno == ECONNREFUSED) || (errno == ENOENT)
	     || (errno == EPROTOTYPE))
		(void)unlink(name.sun_path);
	else
		errlog("outcntrl sendto failed");
	return(1);
}

/*-----------------------------------------------------------------------
 * outmsg
 *
 * send the indicated message to the waiting client who pid is "pid".
 * clients sockets are always the CLIENTPRE followed by the clients pid.
 *-----------------------------------------------------------------------
 */
outmsg(pid, cmd)
u_long pid;
char cmd;
{
	int len;                 /* the size of the datagram header */
	struct sockaddr_un name; /* datagram recipient */
	extern int msgsock;
	extern char *sprintf();
	extern int errno;

	/*
	 * set up the address of the target of the message
	 */
	name.sun_family = AF_UNIX;
	(void)sprintf(name.sun_path, "%s%u",CLIENTPRE,pid);
	len = strlen(name.sun_path) + sizeof(name.sun_family) + 1;

	if (sendto(msgsock, &cmd, sizeof(cmd), 0, &name, len) >= 0)
		return(0);

	/*
	 * If this point is reached:
	 *
	 * The sendto FAILED, either client died and left the old socket
	 * entry in the filesystem (so remove it) or terminated and
	 * cleaned up the old socket entry.
	 */
	if ((errno == ENOTSOCK) || (errno == ECONNREFUSED) || (errno == ENOENT)
	     || (errno == EPROTOTYPE))
		(void)unlink(name.sun_path);
	else
		errlog("outmsg sendto failed");
	return(1);
}

/*------------------------------------------------------------------------
 * prall
 *
 * remove ALL the waiting tasks in the queue. The jobs are told to 
 * terminate.
 *------------------------------------------------------------------------
 */
prall(work)
struct request *work;
{
	register struct qnode *ptr;
	extern struct qnode *qtail;

	/*
	 * make sure control socket is owned by root
	 * otherwise reject it
	 */
	if (chksock(CNTRLPRE, work->pid, 0) == 0){
		(void)outcntrl(work->pid, STOPCMD);
		return;
	}

	for (ptr = qtail; ptr != QNIL; ptr= ptr->back){
		(void)outmsg(ptr->pid, STOPCMD);
		rmqueue(ptr);
	}

	/*
	 * tell the client the control program the queue is purged
	 */
	(void)outcntrl(work->pid, RUNCMD);
}

/*--------------------------------------------------------------------------
 * prjob
 *
 * remove a job (specified by its pid) from the queue. The job is told to
 * terminate. If the job is not found, tell the requesting client.
 *--------------------------------------------------------------------------
 */
prjob(work, port)
register struct request *work;
int port;
{
	register struct qnode *ptr;
	extern struct qnode *qtail;
	extern int cntrlsock;
	extern int msgsock;

	/*
	 * check to see if this is a control program request or
	 * a client request. check the validity of the bound
	 * socket in either case
	 */
	if (port == cntrlsock){
		if (chksock(CNTRLPRE, work->pid, 0) == 0){
			(void)outcntrl(work->pid, STOPCMD);
			return;
		}
	}else if (chksock(CLIENTPRE, work->pid, (int)work->uid) == 0){
		(void)outmsg(work->pid, STOPCMD);
		return;
	}

	for (ptr = qtail; ptr != QNIL; ptr = ptr->back){
		if (ptr->pid != work->time)
			continue;
		/*
		 * found the job, ONLY remove if the requester owns
		 * the job, if the requester is root, or this is a
		 * client that is terminating from a signal and is
		 * sending it's "last breath".
		 */
		if (work->pid == work->time){
			/*
			 * clients "last breath". just remove from
			 * queue as by now the client is dead.
			 */
			rmqueue(ptr);
			return;
		}
		if ((work->uid == 0) || (ptr->uid == work->uid)){
			(void)outmsg(ptr->pid, STOPCMD);
			rmqueue(ptr);
			if (port == cntrlsock)
				(void)outcntrl(work->pid, RUNCMD);
			else
				(void)outmsg(work->pid, RUNCMD);
			return;
		}else
			break;
	}
	/*
	 * command failed, tell the process that sent the datagram
	 * only if this is not a "last breath" message (should really
	 * never happen!)
	 */
	if (port == cntrlsock)
		(void)outcntrl(work->pid, STOPCMD);
	else if (work->pid != work->time)
		(void)outmsg(work->pid, STOPCMD);
}

/*-------------------------------------------------------------------------
 * prusr
 *
 * remove all the jobs queued that belong to a specified user. Only root
 * or the user can request his jobs to be removed.
 * (check for user field must be done in the control program).
 *-------------------------------------------------------------------------
 */
prusr(work)
register struct request *work;
{
	register struct qnode *ptr;
	int found = 0;
	extern struct qnode *qtail;

	/*
	 * check to see if this is a valid control program request
	 */
	if (chksock(CNTRLPRE, work->pid, 0) == 0){
		(void)outcntrl(work->pid, STOPCMD);
		return;
	}

	for (ptr = qtail; ptr != QNIL; ptr = ptr->back){
		/*
		 * found a job owned by that user.
		 */
		if (ptr->uid == work->uid){
			(void)outmsg(ptr->pid, STOPCMD);
			rmqueue(ptr);
			found = 1;
		}
	}
	if (found == 1)
		(void)outcntrl(work->pid, RUNCMD);
	else
		(void)outcntrl(work->pid, STOPCMD);
}

/*-------------------------------------------------------------------------
 * runjob
 *
 * run a specified job (by pid) REGARDLESS of the load.
 *-------------------------------------------------------------------------
 */
runjob(work)
register struct request *work;
{
	register struct qnode *ptr;
	extern struct qnode *qtail;

	/*
	 * check to see if this is a control program request
	 */
	if (chksock(CNTRLPRE, work->pid, 0) == 0){
		(void)outcntrl(work->pid, STOPCMD);
		return;
	}

	for (ptr = qtail; ptr != QNIL; ptr = ptr->back){
		if (ptr->pid == work->time){
			/*
		 	 * found the job
		 	 */
			(void)outmsg(ptr->pid, RUNCMD);
			rmqueue(ptr);
			(void)outcntrl(work->pid, RUNCMD);
			return;
		}
	}
	(void)outcntrl(work->pid, STOPCMD);
}

/*-------------------------------------------------------------------------
 * runusr
 *
 * run all jobs owned by a user REGARDLES of the load
 *-------------------------------------------------------------------------
 */
runusr(work)
register struct request *work;
{
	register struct qnode *ptr;
	int found = 0;
	extern struct qnode *qtail;

	/*
	 * check to see if this is a control program request
	 */
	if (chksock(CNTRLPRE, work->pid, 0) == 0){
		(void)outcntrl(work->pid, STOPCMD);
		return;
	}

	for (ptr = qtail; ptr != QNIL; ptr = ptr->back){
		if (ptr->uid == work->uid){
			/*
		 	 * found a job owned by that user
		 	 */
			(void)outmsg(ptr->pid, RUNCMD);
			rmqueue(ptr);
			found = 1;
		}
	}

	if (found == 1)
		(void)outcntrl(work->pid, RUNCMD);
	else
		(void)outcntrl(work->pid, STOPCMD);
}

/*-------------------------------------------------------------------------
 * status
 *
 * update the status file. the status file contains the current setting
 * of server paramters which can be changed by the control program.
 *-------------------------------------------------------------------------
 */
status(work)
struct request *work;
{
	FILE *out;
	extern int errorcount;
	extern int newstatus;
	extern int qcount;
	extern int full;
	extern int timerstop;
#ifdef sun
	extern long loadlevel;
#else
	extern double loadlevel;
#endif sun
	extern u_long mqtime;
	extern struct itimerval startalrm;
	extern FILE *fopen();

	/*
	 * status is the same since the last request.
	 */
	if (newstatus == 0){
		(void)outcntrl(work->pid, RUNCMD);
		return;
	}

	if ((out = fopen(STATUSFILE, "w")) == NULL){
		errlog("status cannot open STATUSFILE");
		(void)outcntrl(work->pid, STOPCMD);
		return;
	}

#ifdef sun
	fprintf(out,"%d %d %d %ld %u %d %ld\n",qcount,full,timerstop,
#else
	fprintf(out,"%d %d %d %ld %u %d %lf\n",qcount,full,timerstop,
#endif
			startalrm.it_value.tv_sec,mqtime,errorcount,loadlevel);

	(void)fclose(out);
	newstatus = 0;
	(void)outcntrl(work->pid, RUNCMD);
}
@//E*O*F server/commands.c//
chmod u=r,g=r,o=r server/commands.c
 
echo Inspecting for damage in transit...
temp=/tmp/shar$$; dtemp=/tmp/.shar$$
trap "rm -f $temp $dtemp; exit" 0 1 2 3 15
cat > $temp <<\!!!
     878    2737   20872 commands.c
!!!
wc  server/commands.c | sed 's=[^ ]*/==' | diff -b $temp - >$dtemp
if [ -s $dtemp ]
then echo "Ouch [diff of wc output]:" ; cat $dtemp
else echo "No problems found."
fi
exit 0



More information about the Comp.sources.unix mailing list