#!/usr/bin/env python
+import os
from setuptools import setup, find_packages
-from os import name
scripts = [
'src/scripts/canari',
- 'src/scripts/pysudo',
'src/scripts/dispatcher',
]
+if os.name == 'posix':
+ scripts.append('src/scripts/pysudo')
+
extras = [
'readline'
]
-if name == 'nt':
+if os.name == 'nt':
scripts += ['%s.bat' % s for s in scripts]
setup(
name='canari',
author='Nadeem Douba',
- version='0.5',
+ version='0.6',
author_email='ndouba@gmail.com',
description='Rapid transform development and transform execution framework for Maltego.',
license='GPL',
#!/usr/bin/env python
-from canari.config import CanariConfigParser
+import os
+import sys
-from os import path, listdir, sep, environ, mkdir, pathsep, getcwd, walk
from pkg_resources import resource_filename
from distutils.dist import Distribution
from distutils.command.install import install
-from sys import path as pypath, platform
from datetime import datetime
from string import Template
+from canari.config import CanariConfigParser
+import threading
+
+def synchronized(func):
+
+ func.__lock__ = threading.RLock()
+
+ def synced_func(*args, **kws):
+ with func.__lock__:
+ return func(*args, **kws)
+
+ return synced_func
__author__ = 'Nadeem Douba'
__copyright__ = 'Copyright 2012, Canari Project'
__credits__ = []
__license__ = 'GPL'
-__version__ = '0.1'
+__version__ = '0.2'
__maintainer__ = 'Nadeem Douba'
__email__ = 'ndouba@gmail.com'
__status__ = 'Development'
def get_commands(module='canari.commands'):
- sc = __import__(module, globals(), locals(), fromlist=['__all__'])
commands = {}
+ sc = __import__(module, globals(), locals(), fromlist=['__all__'])
for c in sc.__all__:
m = __import__('%s.%s' % (module, c), globals(), locals(), fromlist=['run', 'help'])
- if 'run' in dir(m):
+ if 'run' in m.__dict__:
commands[cmd_name(m.__name__)] = m
return commands
def _detect_settings_dir(d):
- vs = [ i for i in listdir(d) if path.isdir(path.join(d, i)) if path.isdir(path.join(d, i, 'config'))]
+ vs = [ i for i in os.listdir(d) if os.path.isdir(os.path.join(d, i)) if os.path.isdir(os.path.join(d, i, 'config'))]
if len(vs) == 1:
- return path.join(d, vs[0])
+ return os.path.join(d, vs[0])
else:
while True:
print('Multiple versions of Maltego detected: ')
r = raw_input('Please select which version you wish to use [0]: ')
try:
if not r:
- return path.join(d, vs[0])
+ return os.path.join(d, vs[0])
elif int(r) < len(vs):
- return path.join(d, vs[int(r)])
+ return os.path.join(d, vs[int(r)])
except ValueError:
pass
print('Invalid selection... %s' % repr(r))
def detect_settings_dir():
d = None
- if platform.startswith('linux'):
- d = _detect_settings_dir(path.join(path.expanduser('~'), '.maltego'))
- elif platform == 'darwin':
- d = _detect_settings_dir(path.join(path.expanduser('~'), 'Library', 'Application Support', 'maltego'))
- elif platform == 'win32':
- d = _detect_settings_dir(path.join(environ['APPDATA'], '.maltego'))
+ if sys.platform.startswith('linux'):
+ d = _detect_settings_dir(os.path.join(os.path.expanduser('~'), '.maltego'))
+ elif sys.platform == 'darwin':
+ d = _detect_settings_dir(os.path.join(os.path.expanduser('~'), 'Library', 'Application Support', 'maltego'))
+ elif sys.platform == 'win32':
+ d = _detect_settings_dir(os.path.join(os.environ['APPDATA'], '.maltego'))
else:
- raise NotImplementedError('Unknown or unsupported OS: %s' % repr(platform))
+ raise NotImplementedError('Unknown or unsupported OS: %s' % repr(sys.platform))
return d
def build_skeleton(*args):
for d in args:
if isinstance(d, list):
- d = sep.join(d)
+ d = os.sep.join(d)
print('creating directory %s' % d)
- mkdir(d)
+ os.mkdir(d)
-def highlight(string, color, bold):
- attr = []
- if color == 'green':
- # green
- attr.append('32')
- elif color == 'red':
- # red
- attr.append('31')
- else:
- attr.append('30')
- if bold:
- attr.append('1')
- return '\x1b[%sm%s\x1b[0m' % (';'.join(attr), string)
+def highlight(s, color, bold):
+
+ if os.name == 'posix':
+ attr = []
+ if color == 'green':
+ # green
+ attr.append('32')
+ elif color == 'red':
+ # red
+ attr.append('31')
+ else:
+ attr.append('30')
+ if bold:
+ attr.append('1')
+ s = '\x1b[%sm%s\x1b[0m' % (';'.join(attr), s)
+
+ return s
def croak(exc):
def fix_pypath():
- if '' not in pypath:
- pypath.insert(0, '')
+ if '' not in sys.path:
+ sys.path.insert(0, '')
def fix_binpath(paths):
if paths is not None:
if isinstance(paths, basestring):
- environ['PATH'] = paths
+ os.environ['PATH'] = paths
elif isinstance(paths, list):
- environ['PATH'] = pathsep.join(paths)
+ os.environ['PATH'] = os.pathsep.join(paths)
def import_transform(script):
root = project_root()
if root is not None:
- conf = path.join(root, '.canari')
- if path.exists(conf):
+ conf = os.path.join(root, '.canari')
+ if os.path.exists(conf):
c = CanariConfigParser()
c.read(conf)
return {
marker = '.canari'
for i in range(0, 5):
- if path.exists(marker):
- return path.dirname(path.realpath(marker))
- marker = '..%s%s' % (sep, marker)
+ if os.path.exists(marker):
+ return os.path.dirname(os.path.realpath(marker))
+ marker = '..%s%s' % (os.sep, marker)
print 'Unable to determine project root.'
exit(-1)
transforms=None
)
- for base, dirs, files in walk(root):
+ for base, dirs, files in os.walk(root):
if base.endswith('src'):
tree['src'] = base
elif 'resources' in dirs:
#!/usr/bin/env python
+import os
+import sys
+
+from argparse import ArgumentParser
+from traceback import format_exc
+
from canari.maltego.message import MaltegoException, MaltegoTransformResponseMessage
from common import croak, import_transform, cmd_name, console_message, fix_binpath
from canari.maltego.utils import onterminate, parseargs
from canari.config import config
-from os import execvp, geteuid, name
-from argparse import ArgumentParser
-from traceback import format_exc
-from sys import argv
-
__author__ = 'Nadeem Douba'
__copyright__ = 'Copyright 2012, Canari Project'
__credits__ = []
__license__ = 'GPL'
-__version__ = '0.1'
+__version__ = '0.2'
__maintainer__ = 'Nadeem Douba'
__email__ = 'ndouba@gmail.com'
__status__ = 'Development'
try:
m = import_transform(transform)
- if name == 'posix' and hasattr(m.dotransform, 'privileged') and geteuid():
- execvp('sudo', ['sudo'] + list(argv))
+ if os.name == 'posix' and hasattr(m.dotransform, 'privileged') and os.geteuid():
+ os.execvp('sudo', ['sudo'] + list(sys.argv))
exit(-1)
if hasattr(m, 'onterminate'):
#!/usr/bin/env python
-from canari.maltego.configuration import (MaltegoTransform, CmdCwdTransformProperty, CmdDbgTransformProperty,
- CmdLineTransformProperty, CmdParmTransformProperty, InputConstraint, TransformSet,
- TransformSettings, CmdCwdTransformPropertySetting, CmdDbgTransformPropertySetting,
- CmdLineTransformPropertySetting, CmdParmTransformPropertySetting)
-from common import detect_settings_dir, cmd_name, fix_pypath, get_bin_dir, import_transform, import_package, fix_etree
-from canari.maltego.message import ElementTree
+import os
+import sys
from pkg_resources import resource_filename, resource_listdir
from xml.etree.cElementTree import XML, SubElement
-from os import path, mkdir, chdir, getcwd, name
from argparse import ArgumentParser
from re import findall, sub
from zipfile import ZipFile
from string import Template
-from sys import stderr
+from canari.maltego.configuration import (MaltegoTransform, CmdCwdTransformProperty, CmdDbgTransformProperty,
+ CmdLineTransformProperty, CmdParmTransformProperty, InputConstraint, TransformSet,
+ TransformSettings, CmdCwdTransformPropertySetting, CmdDbgTransformPropertySetting,
+ CmdLineTransformPropertySetting, CmdParmTransformPropertySetting)
+from common import detect_settings_dir, cmd_name, fix_pypath, get_bin_dir, import_transform, import_package, fix_etree
+from canari.maltego.message import ElementTree
__author__ = 'Nadeem Douba'
__credits__ = []
__license__ = 'GPL'
-__version__ = '0.2'
+__version__ = '0.3'
__maintainer__ = 'Nadeem Douba'
__email__ = 'ndouba@gmail.com'
__status__ = 'Development'
'-w',
'--working-dir',
metavar='[working dir]',
- default=getcwd(),
+ default=os.getcwd(),
help='the path that will be used as the working directory for the transforms being installed (default: current working directory)'
)
parser.add_argument(
# Logic to install transforms
def install_transform(module, name, author, spec, prefix, working_dir):
- installdir = path.join(prefix, 'config', 'Maltego', 'TransformRepositories', 'Local')
+ installdir = os.path.join(prefix, 'config', 'Maltego', 'TransformRepositories', 'Local')
- if not path.exists(installdir):
- mkdir(installdir)
+ if not os.path.exists(installdir):
+ os.mkdir(installdir)
- setsdir = path.join(prefix, 'config', 'Maltego', 'TransformSets')
+ setsdir = os.path.join(prefix, 'config', 'Maltego', 'TransformSets')
for i,n in enumerate(spec.uuids):
if n in transforms:
- stderr.write('WARNING: Previous declaration of %s in transform %s. Overwriting...' % (n, module))
+ sys.stderr.write('WARNING: Previous declaration of %s in transform %s. Overwriting...' % (n, module))
else:
print ('Installing transform %s from %s...' % (n, module))
transforms[n] = module
sets = None
if spec.inputs[i][0] is not None:
- setdir = path.join(setsdir, spec.inputs[i][0])
- if not path.exists(setdir):
- mkdir(setdir)
- open(path.join(setdir, n), 'w').close()
+ setdir = os.path.join(setsdir, spec.inputs[i][0])
+ if not os.path.exists(setdir):
+ os.mkdir(setdir)
+ open(os.path.join(setdir, n), 'w').close()
sets=TransformSet(spec.inputs[i][0])
transform = MaltegoTransform(
transform.sets
- ElementTree(transform).write(path.join(installdir, '%s.transform' % n))
+ ElementTree(transform).write(os.path.join(installdir, '%s.transform' % n))
transformsettings = TransformSettings(properties=[
- CmdLineTransformPropertySetting(path.join(get_bin_dir(), 'dispatcher')),
+ CmdLineTransformPropertySetting(
+ os.path.join(get_bin_dir(),
+ 'dispatcher.bat' if os.name == 'nt' else 'dispatcher')
+ ),
CmdParmTransformPropertySetting(name),
CmdCwdTransformPropertySetting(working_dir),
CmdDbgTransformPropertySetting(spec.debug)
])
- ElementTree(transformsettings).write(path.join(installdir, '%s.transformsettings' % n))
+ ElementTree(transformsettings).write(os.path.join(installdir, '%s.transformsettings' % n))
def writeconf(sf, df, **kwargs):
- if not path.exists(df):
+ if not os.path.exists(df):
print ('Writing %s to %s' % (sf, df))
with file(df, mode='wb') as w:
if 'sub' in kwargs and kwargs['sub']:
def updateconf(c, f):
- ld = getcwd()
- chdir(path.dirname(f))
+ ld = os.getcwd()
+ os.chdir(os.path.dirname(f))
import canari.config as config
reload(config)
s = r.read()
with file(f, mode='wb') as w:
w.write(sub(r'configs\s*\=', 'configs = %s,' % c, s))
- chdir(ld)
+ os.chdir(ld)
def installconf(opts, args):
src = resource_filename('canari.resources.template', 'canari.plate')
writeconf(
src,
- path.join(opts.working_dir, 'canari.conf'),
+ os.path.join(opts.working_dir, 'canari.conf'),
sub=True,
command=' '.join(['canari install'] + args),
config=('%s.conf' % opts.package) if opts.package != 'canari' else '',
- path='${PATH},/usr/local/bin,/opt/local/bin' if name == 'posix' else ''
+ path='${PATH},/usr/local/bin,/opt/local/bin' if os.name == 'posix' else ''
)
if opts.package != 'canari':
src = resource_filename('%s.resources.etc' % opts.package, '%s.conf' % opts.package)
- writeconf(src, path.join(opts.working_dir, '%s.conf' % opts.package), sub=False)
- updateconf('%s.conf' % opts.package, path.join(opts.working_dir, 'canari.conf'))
+ writeconf(src, os.path.join(opts.working_dir, '%s.conf' % opts.package), sub=False)
+ updateconf('%s.conf' % opts.package, os.path.join(opts.working_dir, 'canari.conf'))
def installmtz(package, prefix):
try:
src = resource_filename('%s.resources.maltego' % package, 'entities.mtz')
- if not path.exists(src):
+ if not os.path.exists(src):
return
- prefix = path.join(prefix, 'config', 'Maltego', 'Entities')
+ prefix = os.path.join(prefix, 'config', 'Maltego', 'Entities')
z = ZipFile(src)
entities = filter(lambda x: x.endswith('.entity'), z.namelist())
data = z.open(e).read()
xml = XML(data)
category = xml.get('category')
- catdir = path.join(prefix, category)
- if not path.exists(catdir):
- mkdir(catdir)
- p = path.join(catdir, path.basename(e))
+ catdir = os.path.join(prefix, category)
+ if not os.path.exists(catdir):
+ os.mkdir(catdir)
+ p = os.path.join(catdir, os.path.basename(e))
print 'Installing entity %s to %s...' % (e, p)
with open(p, 'wb') as f:
f.write(data)
def installmachines(package, prefix):
try:
- prefix = path.join(prefix, 'config', 'Maltego', 'Machines')
- n = path.join(prefix, '.nbattrs')
+ prefix = os.path.join(prefix, 'config', 'Maltego', 'Machines')
+ n = os.path.join(prefix, '.nbattrs')
e = XML('<attributes version="1.0"/>')
- if path.exists(n):
+ if os.path.exists(n):
e = XML(file(n).read())
- if not path.exists(prefix):
- mkdir(prefix)
+ if not os.path.exists(prefix):
+ os.mkdir(prefix)
package = '%s.resources.maltego' % package
for m in filter(lambda x: x.endswith('.machine'), resource_listdir(package, '')):
src = resource_filename(package, m)
- dst = path.join(prefix, m)
+ dst = os.path.join(prefix, m)
print 'Installing machine %s to %s...' % (src, dst)
with open(dst, 'wb') as f:
data = file(src).read()
#!/usr/bin/env python
-from canari.maltego.message import (MaltegoTransformResponseMessage, MaltegoException,
- MaltegoTransformExceptionMessage, MaltegoMessage, Message)
-from common import cmd_name, import_transform, fix_binpath, fix_pypath, import_package
-from canari.config import config
+import os
+import sys
from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
-from os import execvp, geteuid, name, path, fork
from xml.etree.cElementTree import fromstring
from SocketServer import ThreadingMixIn
from ssl import wrap_socket, CERT_NONE
from urlparse import urlsplit
from re import sub, findall
from hashlib import md5
-from sys import argv
+from canari.maltego.message import (MaltegoTransformResponseMessage, MaltegoException,
+ MaltegoTransformExceptionMessage, MaltegoMessage, Message)
+from common import cmd_name, import_transform, fix_binpath, fix_pypath, import_package
+from canari.config import config
__author__ = 'Nadeem Douba'
if opts.port == -1:
opts.port = 443 if not opts.disable_ssl else 80
- if name == 'posix' and geteuid() and (opts.port <= 1024 or opts.enable_privileged):
+ if os.name == 'posix' and os.geteuid() and (opts.port <= 1024 or opts.enable_privileged):
print ('You must run this server as root to continue...')
- execvp('sudo', ['sudo'] + list(argv))
+ os.execvp('sudo', ['sudo'] + list(sys.argv))
fix_binpath(config['default/path'])
if not hasattr(m2, 'dotransform'):
continue
- if name == 'posix' and hasattr(m2.dotransform, 'privileged') and (geteuid() or not opts.enable_privileged):
+ if os.name == 'posix' and hasattr(m2.dotransform, 'privileged') and (os.geteuid() or not opts.enable_privileged):
continue
if hasattr(m2.dotransform, 'remote') and m2.dotransform.remote:
server_address = (opts.listen_on, opts.port)
if not opts.disable_ssl:
- if not path.exists(opts.cert):
+ if not os.path.exists(opts.cert):
print ('The certificate file %s does not exist. Please create a PEM file...' % repr(opts.cert))
exit(-1)
print ('Making it secure (1337)...')
print ('Really? Over regular HTTP? What a shame...')
httpd = AsyncMaltegoHTTPServer(server_address=server_address, transforms=transforms, hostname=opts.hostname)
- if not opts.daemon or not fork():
+ if not opts.daemon or not os.fork():
try:
httpd.serve_forever()
except KeyboardInterrupt:
#!/usr/bin/env python
-from canari.maltego.message import MaltegoException, MaltegoTransformResponseMessage
-from common import cmd_name, import_transform, fix_binpath, get_bin_dir
-from canari.maltego.utils import onterminate, parseargs, croak, message
+import os
+import sys
-from os import execvp, geteuid, name, path
from argparse import ArgumentParser
from traceback import format_exc
+
+from canari.maltego.message import MaltegoException, MaltegoTransformResponseMessage
+from common import cmd_name, import_transform, fix_binpath, get_bin_dir
+from canari.maltego.utils import onterminate, parseargs, croak, message
from canari.config import config
-from sys import argv
__author__ = 'Nadeem Douba'
__credits__ = []
__license__ = 'GPL'
-__version__ = '0.1'
+__version__ = '0.2'
__maintainer__ = 'Nadeem Douba'
__email__ = 'ndouba@gmail.com'
__status__ = 'Development'
[transform, params, value, fields] = parseargs(['canari %s' % cmd_name(__name__)] + args)
m = None
- pysudo = path.join(get_bin_dir(), 'pysudo')
+ pysudo = os.path.join(get_bin_dir(), 'pysudo')
fix_binpath(config['default/path'])
try:
m = import_transform(transform)
- if name == 'posix' and hasattr(m.dotransform, 'privileged') and geteuid():
+ if os.name == 'posix' and hasattr(m.dotransform, 'privileged') and os.geteuid():
# Keep it for another day
# if platform == 'darwin':
# execvp(
# if sys.platform.startswith('linux') and path.exists("/usr/bin/gksudo"):
# execvp('/usr/bin/gksudo', ['/usr/bin/gksudo'] + list(sys.argv))
# else:
- execvp(pysudo, [pysudo] + list(argv))
+ os.execvp(pysudo, [pysudo] + list(sys.argv))
exit(-1)
if hasattr(m, 'onterminate'):
#!/usr/bin/env python
-from common import console_message, cmd_name, highlight, fix_pypath, fix_binpath, import_package
-from canari.maltego.message import MaltegoTransformResponseMessage
-from canari.config import config
+import os
+import sys
-from os import path, name, geteuid, execvp
from code import InteractiveConsole
from argparse import ArgumentParser
from atexit import register
-from sys import argv
-import readline
+
+from common import console_message, cmd_name, highlight, fix_pypath, fix_binpath, import_package
+from canari.maltego.message import MaltegoTransformResponseMessage
+from canari.config import config
__author__ = 'Nadeem Douba'
__credits__ = []
__license__ = 'GPL'
-__version__ = '0.1'
+__version__ = '0.2'
__maintainer__ = 'Nadeem Douba'
__email__ = 'ndouba@gmail.com'
__status__ = 'Development'
def __init__(self, mod):
self.mod = mod
- self.sudoargs = ['sudo'] + list(argv)
+ self.sudoargs = ['sudo'] + list(sys.argv)
def __call__(self, value, *args, **kwargs):
- if name == 'posix' and hasattr(self.mod.dotransform, 'privileged') and geteuid():
+ if os.name == 'posix' and hasattr(self.mod.dotransform, 'privileged') and os.geteuid():
print highlight("Need to be root to run this transform... sudo'ing...", 'green', True)
- execvp('sudo', self.sudoargs)
+ os.execvp('sudo', self.sudoargs)
return
return console_message(self.mod.dotransform(
type(
if getattr(mod, 'dotransform', ''):
transforms[name] = ShellCommand(mod)
InteractiveConsole.__init__(self, locals=transforms)
- self.init_history(path.expanduser('~/.mtgsh_history'))
+ self.init_history(os.path.expanduser('~/.mtgsh_history'))
def init_history(self, histfile):
- readline.parse_and_bind('tab: complete')
- if hasattr(readline, "read_history_file"):
- try:
- readline.read_history_file(histfile)
- except IOError:
- pass
- register(self.save_history, histfile)
-
- def save_history(self, histfile):
- readline.write_history_file(histfile)
- print ('bye!')
+ try:
+ import readline
+ readline.parse_and_bind('tab: complete')
+ if hasattr(readline, "read_history_file"):
+ try:
+ readline.read_history_file(histfile)
+ except IOError:
+ pass
+ register(lambda h: readline.write_history_file(h), histfile)
+ except ImportError:
+ pass
def run(args):
__credits__ = []
__license__ = 'GPL'
-__version__ = '0.2'
+__version__ = '0.3'
__maintainer__ = 'Nadeem Douba'
__email__ = 'ndouba@gmail.com'
__status__ = 'Development'
def imagepath(pkg, name):
return '%s' % resource_filename(pkg, name)
+
def external_resource(name, pkg=None):
if pkg is None:
pkg = '%s.resources.external' % modulecallee().__name__.split('.')[0]
return resource_filename(pkg, name)
+def image_resource(name, pkg=None):
+ if pkg is None:
+ pkg = '%s.resources.images' % modulecallee().__name__.split('.')[0]
+ return imagepath(pkg, name)
+
+
+def icon_resource(name, pkg=None):
+ if pkg is None:
+ pkg = '%s.resources.images' % modulecallee().__name__.split('.')[0]
+ return imageicon(pkg, name)
+
+
# etc
conf = resource_filename(etc, 'canari.conf')
\ No newline at end of file
#!/usr/bin/env python
-from os import path, name, stat
+import os
+
from tempfile import gettempdir
from sys import maxint
from time import time
-if name == 'nt':
- from win32con import LOCKFILE_EXCLUSIVE_LOCK as LOCK_EX, LOCKFILE_FAIL_IMMEDIATELY as LOCK_NB
- from win32file import _get_osfhandle, LockFileEx, UnlockFileEx
- from pywintypes import OVERLAPPED, error as WinIOError
+if os.name == 'nt':
+ import msvcrt
+ from ctypes import *
+ from ctypes.wintypes import BOOL, DWORD, HANDLE
+
+ LOCK_SH = 0 # the default
+ LOCK_NB = 0x1 # LOCKFILE_FAIL_IMMEDIATELY
+ LOCK_EX = 0x2 # LOCKFILE_EXCLUSIVE_LOCK
+
+ # --- the code is taken from pyserial project ---
+ #
+ # detect size of ULONG_PTR
+ def is_64bit():
+ return sizeof(c_ulong) != sizeof(c_void_p)
+ if is_64bit():
+ ULONG_PTR = c_int64
+ else:
+ ULONG_PTR = c_ulong
+ PVOID = c_void_p
+
+ # --- Union inside Structure by stackoverflow:3480240 ---
+ class _OFFSET(Structure):
+ _fields_ = [
+ ('Offset', DWORD),
+ ('OffsetHigh', DWORD)]
+
+ class _OFFSET_UNION(Union):
+ _anonymous_ = ['_offset']
+ _fields_ = [
+ ('_offset', _OFFSET),
+ ('Pointer', PVOID)]
+
+ class OVERLAPPED(Structure):
+ _anonymous_ = ['_offset_union']
+ _fields_ = [
+ ('Internal', ULONG_PTR),
+ ('InternalHigh', ULONG_PTR),
+ ('_offset_union', _OFFSET_UNION),
+ ('hEvent', HANDLE)]
+
+ LPOVERLAPPED = POINTER(OVERLAPPED)
+
+ # --- Define function prototypes for extra safety ---
+ LockFileEx = windll.kernel32.LockFileEx
+ LockFileEx.restype = BOOL
+ LockFileEx.argtypes = [HANDLE, DWORD, DWORD, DWORD, DWORD, LPOVERLAPPED]
+ UnlockFileEx = windll.kernel32.UnlockFileEx
+ UnlockFileEx.restype = BOOL
+ UnlockFileEx.argtypes = [HANDLE, DWORD, DWORD, DWORD, LPOVERLAPPED]
else:
from fcntl import flock, LOCK_EX, LOCK_NB, LOCK_SH, LOCK_UN
__author__ = 'Nadeem Douba'
__copyright__ = 'Copyright 2012, Canari Project'
-__credits__ = []
+__credits__ = [ 'Jonathan Feinberg' ]
__license__ = 'GPL'
-__version__ = '0.1'
+__version__ = '0.2'
__maintainer__ = 'Nadeem Douba'
__email__ = 'ndouba@gmail.com'
__status__ = 'Development'
]
-if name == 'nt':
- LOCK_SH = 0
- LOCK_UN = 0
- __overlapped = OVERLAPPED()
-
+if os.name == 'nt':
def flock(file, flags):
- hfile = _get_osfhandle(file.fileno())
- try:
- if flags & LOCK_UN:
- UnlockFileEx(hfile, 0, -0x10000, __overlapped)
- else:
- LockFileEx(hfile, flags, 0, -0x10000, __overlapped)
- except WinIOError, exc:
- raise IOError('[Errno %d] %s' % (exc[0], exc[2]))
+ hfile = msvcrt.get_osfhandle(file.fileno())
+ overlapped = OVERLAPPED()
+ if flags & LOCK_UN and UnlockFileEx(hfile, 0, 0, 0xFFFF0000, byref(overlapped)):
+ return
+ elif flags & (LOCK_EX | LOCK_NB | LOCK_SH) and \
+ LockFileEx(hfile, flags, 0, 0, 0xFFFF0000, byref(overlapped)):
+ return
+ raise IOError(GetLastError())
def cookie(name):
- return path.join(gettempdir(), name)
+ return os.path.join(gettempdir(), name)
class fsemaphore(file):
class ufile(file):
def __init__(self, name):
- if path.exists(name):
- p, n = path.split(name)
- n, e = path.splitext(n)
+ if os.path.exists(name):
+ p, n = os.path.split(name)
+ n, e = os.path.splitext(n)
for i in xrange(2, maxint):
- name = path.join(p, '%s(%d)%s') % (n, i, e)
- if not path.exists(name):
+ name = os.path.join(p, '%s(%d)%s') % (n, i, e)
+ if not os.path.exists(name):
break
super(ufile, self).__init__(name, mode='wb')
def age(path):
- return time() - stat(path).st_mtime
\ No newline at end of file
+ return time() - os.stat(path).st_mtime
\ No newline at end of file
+++ /dev/null
-#!/usr/bin/env python
-
-__author__ = 'Nadeem Douba'
-__copyright__ = 'Copyright 2012, Canari Project'
-__credits__ = ['Nadeem Douba']
-
-__license__ = 'GPL'
-__version__ = '0.1'
-__maintainer__ = 'Nadeem Douba'
-__email__ = 'ndouba@gmail.com'
-__status__ = 'Development'
\ No newline at end of file