--- a/lib/ipfix.c +++ b/lib/ipfix.c @@ -37,6 +37,9 @@ $$LIC$$ #ifdef SCTPSUPPORT #include <netinet/sctp.h> #endif +#ifndef NOTHREADS +#include <pthread.h> +#endif #include <fcntl.h> #include <netdb.h> @@ -123,6 +126,18 @@ static uint16_t g_lasttid; static ipfix_datarecord_t g_data = { NULL, NULL, 0 }; /* ipfix_export */ static ipfix_field_t *g_ipfix_fields; +#ifndef NOTHREADS +static pthread_mutex_t g_mutex; +#define mod_lock() { \ + if ( pthread_mutex_lock( &g_mutex ) !=0 ) \ + mlogf( 0, "[ipfix] mutex_lock() failed: %s\n", \ + strerror( errno ) ); \ + } +#define mod_unlock() { pthread_mutex_unlock( &g_mutex ); } +#else +#define mod_lock() +#define mod_unlock() +#endif /*----- prototypes -------------------------------------------------------*/ @@ -133,6 +148,7 @@ int _ipfix_send_message( ipfix_t *ifh, ipfix_message_t *message ); int _ipfix_write_msghdr( ipfix_t *ifh, ipfix_message_t *msg, iobuf_t *buf ); void _ipfix_disconnect( ipfix_collector_t *col ); +int _ipfix_export_flush( ipfix_t *ifh ); /* name : do_writeselect @@ -576,16 +592,18 @@ int ipfix_decode_float( void *in, void * int ipfix_snprint_float( char *str, size_t size, void *data, size_t len ) { - float tmp32; - double tmp64; + uint32_t tmp32; + uint64_t tmp64; switch ( len ) { case 4: - ipfix_decode_float( data, &tmp32, 4); - return snprintf( str, size, "%f", tmp32 ); + memcpy( &tmp32, data, len ); + tmp32 = htonl( tmp32 ); + return snprintf( str, size, "%f", (float)tmp32 ); case 8: - ipfix_decode_float( data, &tmp64, 8); - return snprintf( str, size, "%lf", tmp64); + memcpy( &tmp64, data, len ); + tmp64 = HTONLL( tmp64 ); + return snprintf( str, size, "%lf", (double)tmp64 ); default: break; } @@ -682,12 +700,19 @@ int ipfix_get_eno_ieid( char *field, int * parameters: * remarks: init module, read field type info. */ -int ipfix_init ( void ) +int ipfix_init( void ) { if ( g_tstart ) { ipfix_cleanup(); } +#ifndef NOTHREADS + if ( pthread_mutex_init( &g_mutex, NULL ) !=0 ) { + mlogf( 0, "[ipfix] pthread_mutex_init() failed: %s\n", + strerror(errno) ); + return -1; + } +#endif g_tstart = time(NULL); signal( SIGPIPE, SIG_IGN ); g_lasttid = 255; @@ -806,6 +831,9 @@ void ipfix_cleanup ( void ) g_data.maxfields = 0; g_data.lens = NULL; g_data.addrs = NULL; +#ifndef NOTHREADS + (void)pthread_mutex_destroy( &g_mutex ); +#endif } int _ipfix_connect ( ipfix_collector_t *col ) @@ -1465,7 +1493,7 @@ int _ipfix_write_template( ipfix_t default: /* check space */ if ( tsize+ifh->offset > IPFIX_DEFAULT_BUFLEN ) { - if ( ipfix_export_flush( ifh ) < 0 ) + if ( _ipfix_export_flush( ifh ) < 0 ) return -1; if ( tsize+ifh->offset > IPFIX_DEFAULT_BUFLEN ) return -1; @@ -1474,6 +1502,8 @@ int _ipfix_write_template( ipfix_t /* write template prior to data */ if ( ifh->offset > 0 ) { memmove( ifh->buffer + tsize, ifh->buffer, ifh->offset ); + if ( ifh->cs_tid ) + ifh->cs_header += tsize; } buf = ifh->buffer; @@ -1615,8 +1645,11 @@ int ipfix_open( ipfix_t **ipfixh, int so return -1; } node->ifh = i; + + mod_lock(); node->next = g_ipfixlist; g_ipfixlist = node; + mod_unlock(); *ipfixh = i; return 0; @@ -1633,7 +1666,8 @@ void ipfix_close( ipfix_t *h ) { ipfix_node_t *l, *n; - ipfix_export_flush( h ); + mod_lock(); + _ipfix_export_flush( h ); while( h->collectors ) _ipfix_drop_collector( (ipfix_collector_t**)&h->collectors ); @@ -1659,6 +1693,7 @@ void ipfix_close( ipfix_t *h ) #endif free(h->buffer); free(h); + mod_unlock(); } } @@ -2156,6 +2191,22 @@ void ipfix_release_template( ipfix_t *if ipfix_delete_template( ifh, templ ); } +static void _finish_cs( ipfix_t *ifh ) +{ + size_t buflen; + uint8_t *buf; + + /* finish current dataset */ + if ( (buf=ifh->cs_header) ==NULL ) + return; + buflen = 0; + INSERTU16( buf+buflen, buflen, ifh->cs_tid ); + INSERTU16( buf+buflen, buflen, ifh->cs_bytes ); + ifh->cs_bytes = 0; + ifh->cs_header = NULL; + ifh->cs_tid = 0; +} + int ipfix_export( ipfix_t *ifh, ipfix_template_t *templ, ... ) { int i; @@ -2199,13 +2250,14 @@ int ipfix_export( ipfix_t *ifh, ipfix_te g_data.addrs, g_data.lens ); } -int ipfix_export_array( ipfix_t *ifh, - ipfix_template_t *templ, - int nfields, - void **fields, - uint16_t *lengths ) +static int +_ipfix_export_array( ipfix_t *ifh, + ipfix_template_t *templ, + int nfields, + void **fields, + uint16_t *lengths ) { - int i; + int i, newset_f=0; size_t buflen, datasetlen; uint8_t *p, *buf; @@ -2249,7 +2301,19 @@ int ipfix_export_array( ipfix_t /** get size of data set, check space */ - for ( i=0, datasetlen=4; i<nfields; i++ ) { + if ( templ->tid == ifh->cs_tid ) { + newset_f = 0; + datasetlen = 0; + } + else { + if ( ifh->cs_tid > 0 ) { + _finish_cs( ifh ); + } + newset_f = 1; + datasetlen = 4; + } + + for ( i=0; i<nfields; i++ ) { if ( templ->fields[i].flength == IPFIX_FT_VARLEN ) { if ( lengths[i]>254 ) datasetlen += 3; @@ -2263,21 +2327,29 @@ int ipfix_export_array( ipfix_t } datasetlen += lengths[i]; } - if ( ((ifh->offset + datasetlen) > IPFIX_DEFAULT_BUFLEN ) - && (ipfix_export_flush( ifh ) <0) ) { - return -1; + + if ( (ifh->offset + datasetlen) > IPFIX_DEFAULT_BUFLEN ) { + if ( ifh->cs_tid ) + _finish_cs( ifh ); + newset_f = 1; + + if ( _ipfix_export_flush( ifh ) <0 ) + return -1; } - /* fill buffer - */ + /* fill buffer */ buf = (uint8_t*)(ifh->buffer) + ifh->offset; buflen = 0; - /* insert data set - */ - ifh->nrecords ++; - INSERTU16( buf+buflen, buflen, templ->tid ); - INSERTU16( buf+buflen, buflen, datasetlen ); + if ( newset_f ) { + /* insert data set + */ + ifh->cs_bytes = 0; + ifh->cs_header = buf; + ifh->cs_tid = templ->tid; + INSERTU16( buf+buflen, buflen, templ->tid ); + INSERTU16( buf+buflen, buflen, 4 ); + } /* insert data record */ @@ -2303,7 +2375,9 @@ int ipfix_export_array( ipfix_t buflen += lengths[i]; } + ifh->nrecords ++; ifh->offset += buflen; + ifh->cs_bytes += buflen; if ( ifh->version == IPFIX_VERSION ) ifh->seqno ++; return 0; @@ -2313,7 +2387,7 @@ int ipfix_export_array( ipfix_t * parameters: * remarks: rewrite this func! */ -int ipfix_export_flush( ipfix_t *ifh ) +int _ipfix_export_flush( ipfix_t *ifh ) { iobuf_t *buf; ipfix_collector_t *col; @@ -2322,8 +2396,14 @@ int ipfix_export_flush( ipfix_t *ifh ) if ( (ifh==NULL) || (ifh->offset==0) ) return 0; - if ( (buf=_ipfix_getbuf()) ==NULL ) + if ( ifh->cs_tid > 0 ) { + /* finish current dataset */ + _finish_cs( ifh ); + } + + if ( (buf=_ipfix_getbuf()) ==NULL ) { return -1; + } #ifdef DEBUG mlogf( 0, "[ipfix_export_flush] msg has %d records, %d bytes\n", @@ -2350,3 +2430,30 @@ int ipfix_export_flush( ipfix_t *ifh ) _ipfix_freebuf( buf ); return ret; } + +int ipfix_export_array( ipfix_t *ifh, + ipfix_template_t *templ, + int nfields, + void **fields, + uint16_t *lengths ) +{ + int ret; + + mod_lock(); + ret = _ipfix_export_array( ifh, templ, nfields, fields, lengths ); + mod_unlock(); + + return ret; +} + +int ipfix_export_flush( ipfix_t *ifh ) +{ + int ret; + + mod_lock(); + ret = _ipfix_export_flush( ifh ); + mod_unlock(); + + return ret; +} + --- a/lib/ipfix.h +++ b/lib/ipfix.h @@ -142,6 +142,12 @@ typedef struct int nrecords; /* no. of records in buffer */ size_t offset; /* output buffer fill level */ uint32_t seqno; /* sequence no. of next message */ + + /* experimental */ + int cs_tid; /* template id of current dataset */ + int cs_bytes; /* size of current set */ + uint8_t *cs_header; /* start of current set */ + } ipfix_t; /** exporter funcs --- a/lib/ipfix_col.c +++ b/lib/ipfix_col.c @@ -907,7 +907,7 @@ int ipfix_decode_datarecord( ipfixt_node return 0; } -static void do_free_datarecord( ipfix_datarecord_t *data ) +void ipfix_free_datarecord( ipfix_datarecord_t *data ) { if ( data ) { if ( data->addrs ) @@ -925,6 +925,7 @@ int ipfix_parse_msg( ipfix_input_t *inpu ipfix_hdr_t hdr; /* ipfix packet header */ ipfixs_node_t *s; ipfix_datarecord_t data = { NULL, NULL, 0 }; + ipfixe_node_t *e; uint8_t *buf; /* ipfix payload */ uint16_t setid, setlen; /* set id, set lenght */ int i, nread, offset; /* counter */ @@ -1042,6 +1043,12 @@ int ipfix_parse_msg( ipfix_input_t *inpu err_flag = 1; } else { + for ( e=g_exporter; e!=NULL; e=e->next ) { + if ( e->elem->export_dset ) + (void) e->elem->export_dset( t, buf+nread, setlen, + e->elem->data ); + } + /** read data records */ for ( offset=nread, bytesleft=setlen; bytesleft>4; ) { @@ -1076,11 +1083,11 @@ int ipfix_parse_msg( ipfix_input_t *inpu goto errend; end: - do_free_datarecord( &data ); + ipfix_free_datarecord( &data ); return nread; errend: - do_free_datarecord( &data ); + ipfix_free_datarecord( &data ); return -1; } @@ -1093,7 +1100,7 @@ void process_client_tcp( int fd, int mas tcp_conn_t *tcon = (tcp_conn_t*)data; char *func = "process_client_tcp"; - mlogf( 3, "[%s] fd %d mask %d called.\n", func, fd, mask ); + mlogf( 4, "[%s] fd %d mask %d called.\n", func, fd, mask ); /** read ipfix header */ --- a/lib/ipfix_col.h +++ b/lib/ipfix_col.h @@ -88,6 +88,7 @@ typedef struct ipfix_col_info int (*export_newsource)(ipfixs_node_t*,void*); int (*export_newmsg)(ipfixs_node_t*,ipfix_hdr_t*,void*); int (*export_trecord)(ipfixs_node_t*,ipfixt_node_t*,void*); + int (*export_dset)(ipfixt_node_t*,uint8_t*,size_t,void*); int (*export_drecord)(ipfixs_node_t*,ipfixt_node_t*, ipfix_datarecord_t*,void*); void (*export_cleanup)(void*); --- a/lib/ipfix_col_files.c +++ b/lib/ipfix_col_files.c @@ -68,7 +68,7 @@ static int export_newsource_file( ipfixs return -1; } snprintf( s->fname+strlen(s->fname), PATH_MAX-strlen(s->fname), - "/%u", s->odid ); + "/%u", (unsigned int)s->odid ); if ( (access( s->fname, R_OK ) <0 ) && (mkdir( s->fname, S_IRWXU ) <0) ) { mlogf( 0, "[%s] cannot access dir '%s': %s\n",