3 # py-rsbac - RSBAC Python bindings
4 # Copyright (C) 2006 Frederic Jolliton <pyrsbac@tuxee.net>
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20 __all__ = [ 'Group' , 'groups' ]
23 if not isinstance( o , basestring ) :
28 raise ImportError , 'duplicate name %r in module %r' % ( name , __name__ )
29 __all__.append( name )
32 from ctypes import byref, pointer
34 from rsbac import headers, lib, transaction
35 from rsbac._data import AclRequestVector
36 from rsbac.errors import raiseIfError, Error
37 from rsbac._utils import fetch, intToTtl, ttlToInt
39 g_aclSyscallArg = headers.rsbac_acl_syscall_arg_t()
40 g_aclSyscallArgRef = byref( g_aclSyscallArg )
42 g_aclSyscallByNameArg = headers.rsbac_acl_syscall_n_arg_t()
43 g_aclSyscallByNameArgRef = byref( g_aclSyscallByNameArg )
46 g_targetId = headers.rsbac_target_id_t()
48 def _acl_preset( target , tid , subjectType , subjectId , rights , ttl ) :
49 g_aclSyscallArg.target = target
50 g_aclSyscallArg.tid = tid
51 g_aclSyscallArg.subj_type = subjectType
52 g_aclSyscallArg.subj_id = subjectId
53 g_aclSyscallArg.rights = rights
54 g_aclSyscallArg.ttl = ttl
55 return g_aclSyscallArgRef
57 def _acl_preset_n( target , name , subjectType , subjectId , rights , ttl ) :
58 g_aclSyscallByNameArg.target = target
59 g_aclSyscallByNameArg.name = name
60 g_aclSyscallByNameArg.subj_type = subjectType
61 g_aclSyscallByNameArg.subj_id = subjectId
62 g_aclSyscallByNameArg.rights = rights
63 g_aclSyscallByNameArg.ttl = ttl
64 return g_aclSyscallByNameArgRef
66 def _to_acl_subject( object ) :
67 from rsbac.objects import User
68 if isinstance( object , Role ) :
69 return headers.ACLS_ROLE , object._role
70 elif isinstance( object , User ) :
71 return headers.ACLS_USER , object.uid
72 elif isinstance( object , Group ) :
73 return headers.ACLS_GROUP , object._id
75 raise RuntimeError , 'unexpected ACL subject of type %r' % ( object.__class__ , )
77 def _from_acl_subject( type , id ) :
78 if type == headers.ACLS_USER :
80 elif type == headers.ACLS_ROLE :
82 elif type == headers.ACLS_GROUP :
85 raise RuntimeError , 'unexpected subject type %r' % ( type , )
91 def setAclEntryById( target , subject , rights , ttl = None ) :
92 st , si = _to_acl_subject( subject )
93 raiseIfError( lib.rsbac_acl( transaction._t ,
94 headers.ACLC_set_acl_entry ,
95 _acl_preset( target.type , target._id._obj ,
97 int( rights ) , ttlToInt( ttl ) ) ) )
99 def setAclEntryByName( target , subject , rights , ttl = None ) :
100 st , si = _to_acl_subject( subject )
101 raiseIfError( lib.rsbac_acl_n( transaction._t ,
102 headers.ACLC_set_acl_entry ,
103 _acl_preset_n( target.type , target._id ,
105 int( rights ) , ttlToInt( ttl ) ) ) )
111 def removeAclEntryById( target , subject ) :
112 st , si = _to_acl_subject( subject )
113 raiseIfError( lib.rsbac_acl( transaction._t ,
114 headers.ACLC_remove_acl_entry ,
115 _acl_preset( target.type , target._id._obj ,
119 def removeAclEntryByName( target , subject ) :
120 st , si = _to_acl_subject( subject )
121 raiseIfError( lib.rsbac_acl_n( transaction._t ,
122 headers.ACLC_remove_acl_entry ,
123 _acl_preset_n( target.type , target._id ,
131 def removeAclById( target ) :
132 raiseIfError( lib.rsbac_acl( transaction._t ,
133 headers.ACLC_remove_acl ,
134 _acl_preset( target.type , target._id._obj ,
137 def removeAclByName( target ) :
138 raiseIfError( lib.rsbac_acl_n( transaction._t ,
139 headers.ACLC_remove_acl ,
140 _acl_preset_n( target.type , target._id ,
144 # remove_from_acl_entry
150 def setAclMaskById( target , mask ) :
151 raiseIfError( lib.rsbac_acl( transaction._t ,
152 headers.ACLC_remove_acl ,
153 _acl_preset( target.type , target._id._obj ,
157 def setAclMaskByName( target , mask ) :
158 raiseIfError( lib.rsbac_acl_n( transaction._t ,
159 headers.ACLC_remove_acl ,
160 _acl_preset_n( target.type , target._id ,
169 def aclRemoveUser( user ) :
170 """Remove user from all groups and all ACLs.
172 user -- UID as integer
175 g_targetId.user = user
176 raiseIfError( lib.rsbac_acl( transaction._t ,
177 headers.ACLC_remove_user ,
178 _acl_preset( headers.T_USER , g_targetId ,
185 def getAclRightsById( target , subject , effective = False ) :
186 st , si = _to_acl_subject( subject )
187 rights = headers.rsbac_acl_rights_vector_t()
188 raiseIfError( lib.rsbac_acl_get_rights( transaction._t ,
189 _acl_preset( target.type , target._id._obj ,
194 return AclRequestVector( rights.value )
196 def getAclRightsByName( target , subject , effective = False ) :
197 st , si = _to_acl_subject( subject )
198 rights = headers.rsbac_acl_rights_vector_t()
199 raiseIfError( lib.rsbac_acl_get_rights_n( transaction._t ,
200 _acl_preset_n( target.type , target._id ,
205 return AclRequestVector( rights.value )
211 def _unpackAclTlist( entries , ttls ) :
212 return [ ( _from_acl_subject( entry.subj_type , entry.subj_id ) ,
213 AclRequestVector( entry.rights ) ,
215 for entry , ttl in zip( entries , ttls ) ]
217 def getAclTargetListById( target ) :
218 arrs = fetch( ( headers.rsbac_acl_entry_t ,
219 headers.rsbac_time_t ) ,
220 lambda n , a , b : lib.rsbac_acl_get_tlist( transaction._t ,
225 return _unpackAclTlist( *arrs )
227 def getAclTargetListByName( target ) :
228 arrs = fetch( ( headers.rsbac_acl_entry_t ,
229 headers.rsbac_time_t ) ,
230 lambda n , a , b : lib.rsbac_acl_get_tlist_n( transaction._t ,
235 return _unpackAclTlist( *arrs )
241 def getAclMaskById( target ) :
242 rights = headers.rsbac_acl_rights_vector_t()
243 raiseIfError( lib.rsbac_acl_get_mask( transaction._t ,
244 target.type , target._id._obj ,
246 return AclRequestVector( rights.value )
248 def getAclMaskByName( target ) :
249 rights = headers.rsbac_acl_rights_vector_t()
250 raiseIfError( lib.rsbac_acl_get_mask_n( transaction._t ,
251 target.type , target._id ,
253 return AclRequestVector( rights.value )
260 def setAclEntry( target , subject , rights , ttl = None ) :
261 """Set an ACL entry for a target-subject couple.
264 subject -- either an instance of objects.User, acl.Group or rc.Role.
265 rights -- AclRequestVector
266 ttl -- None or a positive integer
270 return setAclEntryByName( target , subject , rights , ttl )
272 return setAclEntryById( target , subject , rights , ttl )
275 def removeAclEntry( target , subject ) :
276 """Remove an ACL entry.
279 subject -- either an instance of objects.User, acl.Group or rc.Role.
283 return removeAclEntryByName( target , subject )
285 return removeAclEntryById( target , subject )
288 def removeAcl( target ) :
289 """Remove all ACL entries on a target.
295 return removeAclByName( target )
297 return removeAclById( target )
300 def setAclMask( target , mask ) :
301 """Set ACL mask on a target.
304 mask -- AclRequestVector
308 return setAclMaskByName( target , mask )
310 return setAclMaskById( target , mask )
313 def getAclRights( target , subject , effective = False ) :
314 """Get ACL rights for a target-subject couple.
317 subject -- either an instance of objects.User, acl.Group or rc.Role.
318 effective -- True to retrieve effective rights, False otherwise.
320 Returns an AclRequestVector.
324 return getAclRightsByName( target , subject , effective )
326 return getAclRightsById( target , subject , effective )
329 def getAclMask( target ) :
330 """Get ACL mask for a target.
334 Returns an AclRequestVector.
338 return getAclMaskByName( target , mask )
340 return getAclMaskById( target , mask )
343 def getAclTargetList( target ) :
344 """Get all ACL entries for a target.
348 Returns a list of 3-tuple (Object, AclRequestVector, ttl).
352 return getAclTargetListByName( target )
354 return getAclTargetListById( target )
356 class AclBase( object ) :
357 __slots__ = ( 'target' , )
358 def __init__( self , target ) :
359 object.__init__( self )
361 def __len__( self ) :
362 return len( self._list() )
363 def __iter__( self ) :
364 return iter( self.keys() )
365 def __repr__( self ) :
366 return '<ACL for %s: %d entries>' % ( repr( self.target ).strip( '<>' ) , len( self ) )
368 return [ e[ 0 ] for e in self._list() ]
370 return [ ( e[ 1 ] , e[ 2 ] ) for e in self._list() ]
372 return [ ( e[ 0 ] , ( e[ 1 ] , e[ 2 ] ) ) for e in self._list() ]
374 class AclById( AclBase ) :
377 return getAclTargetListById( self.target )
378 def __getitem__( self , subject ) :
379 # FIXME: temporary solution. Waiting for an O(1) way to get
380 # (right,TTL) couple.
381 for k , v in self.items() :
384 #return AclRequestVector() , None
386 def __setitem__( self , subject , value ) :
387 if not isinstance( value , tuple ) :
390 setAclEntryById( self.target , subject , rights , ttl )
391 def __delitem__( self , subject ) :
392 removeAclEntryById( self.target , subject )
393 # def update( self ) : pass
395 # FIXME: temporary solution. Should sys_rsbac_remove_acl
396 # handle T_PROCESS instead?
397 if isinstance( self.target , Process ) :
398 map( self.__delitem__ , self )
400 removeAclById( self.target )
402 class AclByName( AclBase ) :
405 return getAclTargetListByName( self.target )
406 def __iter__( self ) :
407 return ( n[ 0 ] for n in self.__list() )
408 def __getitem__( self , subject ) :
409 # FIXME: temporary solution. Waiting for an O(1) way to get
410 # (right,TTL) couple.
411 for k , v in self.items() :
414 #return RequestVector() , None
416 def __setitem__( self , subject , value ) :
417 if not isinstance( value , tuple ) :
420 setAclEntryByName( self.target , subject , rights , ttl )
421 def __delitem__( self , subject ) :
422 removeAclEntryByName( self.target , subject )
424 removeAclByName( self.target )
426 class EffectiveAclById( AclBase ) :
429 return getAclTargetListById( self.target )
430 def __iter__( self ) :
431 return ( n[ 0 ] for n in self.__list() )
432 def __getitem__( self , subject ) :
433 return getAclRightsById( self.target , subject , True ) , False
435 class EffectiveAclByName( AclBase ) :
438 return getAclTargetListByName( self.target )
439 def __iter__( self ) :
440 return ( n[ 0 ] for n in self.__list() )
441 def __getitem__( self , subject ) :
442 return getAclRightsByName( self.target , subject , True ) , False
444 #--[ Group ]------------------------------------------------------------------
447 def addGroup( name , type = headers.ACLG_PRIVATE , id = None ) :
450 type -- type of group (either ACLG_GLOBAL or ACLG_PRIVATE)
451 name -- group name as string
452 id -- a positive integer or None (automatic ID)
454 Returns the ID of the new group.
459 id_ = headers.rsbac_acl_group_id_t()
461 arg = headers.rsbac_acl_group_syscall_arg_t()
462 arg.add_group.type = type
463 arg.add_group.name = name
464 arg.add_group.group_id_p = pointer( id_ )
465 raiseIfError( lib.rsbac_acl_group( transaction._t ,
466 headers.ACLGS_add_group ,
468 return int( id_.value )
471 def addGlobalGroup( name , id = None ) :
472 """Add an global ACL group.
474 name -- group name as string
475 id -- a positive integer or None (automatic ID)
477 Returns the ID of the new group.
480 return addGroup( name , headers.ACLG_GLOBAL , id )
483 def addPrivateGroup( name , id = None ) :
484 """Add an private ACL group.
486 name -- group name as string
487 id -- a positive integer or None (automatic ID)
489 Returns the ID of the new group.
492 return addGroup( name , headers.ACLG_PRIVATE , id )
495 def changeGroup( id , owner , type , name ) :
496 """Change ACL group attributes.
498 Change the owner, type and name of an existing ACL group.
500 id -- ACL group as integer
501 owner -- user as integer
502 type -- either headers.ACLG_PRIVATE or headers.ACLG_GLOBAL
506 arg = headers.rsbac_acl_group_syscall_arg_t()
507 arg.change_group.id = id
508 arg.change_group.owner = owner
509 arg.change_group.type = type
510 arg.change_group.name = name
511 raiseIfError( lib.rsbac_acl_group( transaction._t ,
512 headers.ACLGS_change_group ,
516 def removeGroup( id ) :
517 """Remove an ACL group.
519 id -- ACL group as integer
522 arg = headers.rsbac_acl_group_syscall_arg_t()
523 arg.remove_group.id = id
524 raiseIfError( lib.rsbac_acl_group( transaction._t ,
525 headers.ACLGS_remove_group ,
528 def _unpackEntry( entry ) :
529 return int( entry.id ) , int( entry.owner ) , int( entry.type ) , entry.name
532 def getGroupEntry( id ) :
533 """Get group entry attributes.
535 id -- ACL group as integer
537 Returns a tuple with 4 attributes: id, owner, type and name.
540 entry = headers.rsbac_acl_group_entry_t()
541 arg = headers.rsbac_acl_group_syscall_arg_t()
542 arg.get_group_entry.id = id
543 arg.get_group_entry.entry_p = pointer( entry )
544 raiseIfError( lib.rsbac_acl_group( transaction._t ,
545 headers.ACLGS_get_group_entry ,
547 return _unpackEntry( entry )
549 def _listGroups( includeGlobal , arr , n ) :
550 arg = headers.rsbac_acl_group_syscall_arg_t()
551 arg.list_groups.include_global = includeGlobal
552 arg.list_groups.group_entry_array = arr
553 arg.list_groups.maxnum = n
554 return raiseIfError( lib.rsbac_acl_group( transaction._t ,
555 headers.ACLGS_list_groups ,
558 # FIXME: Other groups may be viewed/changed while not listed by
559 # _listGroups (the groups belonging to other users.)
561 def getGroupList( includeGlobal = False ) :
562 """Get the list of ACL groups.
564 includeGlobal -- boolean. If True, include global groups.
566 Returns list of group ID.
569 arr = fetch( headers.rsbac_acl_group_entry_t ,
570 lambda n , a : _listGroups( includeGlobal , a , n ) ,
572 return map( _unpackEntry , arr )
575 def addGroupMember( group , user , ttl = None ) :
576 """Add a member to an ACL group.
578 group -- ACL group as integer
579 user -- user as integer
580 ttl -- None or a positive integer
583 arg = headers.rsbac_acl_group_syscall_arg_t()
584 arg.add_member.group = int( group )
585 arg.add_member.user = int( user )
586 arg.add_member.ttl = ttlToInt( ttl )
587 return raiseIfError( lib.rsbac_acl_group( transaction._t ,
588 headers.ACLGS_add_member ,
592 def removeGroupMember( group , user ) :
593 """Remove a member from an ACL group.
595 group -- ACL group as integer
596 user -- user as integer
599 arg = headers.rsbac_acl_group_syscall_arg_t()
600 arg.remove_member.group = int( group )
601 arg.remove_member.user = int( user )
602 return raiseIfError( lib.rsbac_acl_group( transaction._t ,
603 headers.ACLGS_remove_member ,
606 def _getUserGroups( user , groups , ttls , n ) :
607 arg = headers.rsbac_acl_group_syscall_arg_t()
608 arg.get_user_groups.user = user
609 arg.get_user_groups.group_array = groups
610 arg.get_user_groups.ttl_array = ttls
611 arg.get_user_groups.maxnum = n
612 return raiseIfError( lib.rsbac_acl_group( transaction._t ,
613 headers.ACLGS_get_user_groups ,
617 def getUserGroups( uid ) :
618 """Get the ACL groups to which a user belong.
620 uid -- user as integer
622 Returns a list of pair (id,ttl).
625 arrs = fetch( ( headers.rsbac_acl_group_id_t ,
626 headers.rsbac_time_t ) ,
627 lambda n , a , b : _getUserGroups( uid , a , b , n ) ,
629 return [ ( int( a ) , int( b ) ) for a , b in zip( *arrs ) ]
631 def _getGroupMembers( group , users , ttls , n ) :
632 arg = headers.rsbac_acl_group_syscall_arg_t()
633 arg.get_group_members.group = group
634 arg.get_group_members.user_array = users
635 arg.get_group_members.ttl_array = ttls
636 arg.get_group_members.maxnum = n
637 return raiseIfError( lib.rsbac_acl_group( transaction._t ,
638 headers.ACLGS_get_group_members ,
642 def getGroupMembers( id ) :
643 """Get the members of the given ACL group.
647 Returns a list of pair (uid,ttl).
650 arrs = fetch( ( headers.rsbac_uid_t ,
651 headers.rsbac_time_t ) ,
652 lambda n , a , b : _getGroupMembers( id , a , b , n ) ,
654 return [ ( int( a ) , intToTtl( b ) ) for a , b in zip( *arrs ) ]
656 class AclGroupMembersTtlDictProxy( object ) :
657 __slots__ = ( '__group' , )
658 def __init__( self , id ) :
660 def __getitem__( self , user ) :
661 for uid , ttl in getGroupMembers( self.__group ) :
667 def __setitem__( self , user , value ) :
668 addGroupMember( self.__group , user , value )
669 def __iter__( self ) :
670 return ( n[ 0 ] for n in getGroupMembers( self.__group ) )
671 def __repr__( self ) :
673 for user , ttl in getGroupMembers( self.__group ) :
675 r.append( str( user ) )
677 r.append( '%s(%ds)' % ( user , ttl ) )
678 return '<Members for ACL group %d: %s>' \
679 % ( self.__group , ', '.join( r ) or 'none' )
680 def add( self , user , ttl = None ) :
681 addGroupMember( self.__group , user , ttl )
682 def discard( self , user ) :
683 removeGroupMember( self.__group , user )
685 map( self.discard , self )
687 # FIXME: Add a note about settings (not settable individually
689 class Group( object ) :
690 __slots__ = ( '_id' , 'members' )
691 def __init__( self , id ) :
693 self.members = AclGroupMembersTtlDictProxy( id )
694 def __repr__( self ) :
696 entry = getGroupEntry( self._id )
697 assert entry[ 2 ] in ( headers.ACLG_GLOBAL , headers.ACLG_PRIVATE )
700 ( 'PRIVATE' , 'GLOBAL' )[ entry[ 2 ] == headers.ACLG_GLOBAL ] )
702 if e[ 0 ] != headers.RSBAC_ENOTFOUND :
705 return '<ACL Group [%d] %s>' % ( self._id , desc )
709 def getOwner( self ) :
710 return User( getGroupEntry( self._id )[ 1 ] )
711 def setOwner( self , value ) :
712 entry = getGroupEntry( self._id )
713 changeGroup( self._id , int( value ) , entry[ 2 ] , entry[ 3 ] )
714 owner = property( getOwner , setOwner )
716 # private (not really a boolean, but let's say it is)
718 def getPrivate( self ) :
719 entry = getGroupEntry( self._id )
721 assert type in ( headers.ACLG_GLOBAL , headers.ACLG_PRIVATE )
722 return type == headers.ACLG_PRIVATE
723 def setPrivate( self , value ) :
724 entry = getGroupEntry( self._id )
726 value = headers.ACLG_PRIVATE
728 value = headers.ACLG_GLOBAL
729 changeGroup( self._id , entry[ 1 ] , value , entry[ 3 ] )
730 private = property( getPrivate , setPrivate )
734 def getName( self ) :
735 return getGroupEntry( self._id )[ 3 ]
736 def setName( self , value ) :
737 entry = getGroupEntry( self._id )
738 changeGroup( self._id , entry[ 1 ] , entry[ 2 ] , value )
739 name = property( getName , setName )
741 class GroupDict( object ) :
743 def __iter__( self ) :
744 return ( item[ 0 ] for item in getGroupList( True ) )
745 def __getitem__( self , id ) :
747 def __delitem__( self , id ) :
748 return removeGroup( id )
749 def __repr__( self ) :
750 return '{' + ', '.join( '%s: %s' % ( a , b ) for a , b in self.items() ) + '}'
754 return map( Group , self )
756 return [ ( group , Group( group ) ) for group in self ]
760 from rsbac.rc import Role
761 from rsbac.objects import User, Process
764 # indent-tabs-mode: nil