Skip to content
GitLab
Menu
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
Classical-MD_openpathsampling
TSE
Commits
42c738b6
Commit
42c738b6
authored
Nov 21, 2016
by
Sander Roet
Browse files
working version
parent
f4093b4f
Changes
11
Hide whitespace changes
Inline
Side-by-side
.gitignore
View file @
42c738b6
*.nc
*.egg-info
examples
*.swp
ops_TSE/tests/__init__.py
deleted
100644 → 0
View file @
f4093b4f
ops_TSE/transitionstate_ensemble.py
deleted
100644 → 0
View file @
f4093b4f
#!/usr/bin/env python2
from
openpathsampling.pathsimulator
import
PathSimulator
import
openpathsampling
as
paths
from
openpathsampling.analysis.shooting_point_analysis
import
\
SnapshotByCoordinateDict
from
simtk
import
unit
import
sys
import
logging.config
logging
.
config
.
fileConfig
(
"./logging.conf"
,
disable_existing_loggers
=
False
)
class
InputError
(
Exception
):
def
__init__
(
self
,
msg
):
self
.
msg
=
msg
class
getTrajectoriesPerShootingPoint
(
SnapshotByCoordinateDict
):
'''
This is inspired on ShootingPointAnalysis, but returns a dict of
trajectories
'''
# TODO: propper description
def
__init__
(
self
,
steps
,
states
):
super
(
getTrajectoriesPerShootingPoint
,
self
).
__init__
()
for
step
in
steps
:
details
=
step
.
change
.
canonical
.
trials
[
0
].
details
shooting_snap
=
details
.
shooting_snapshot
key
=
shooting_snap
trial_traj
=
step
.
change
.
canonical
.
trials
[
0
].
trajectory
init_traj
=
details
.
initial_trajectory
test_points
=
[
s
for
s
in
[
trial_traj
[
0
],
trial_traj
[
-
1
]]
if
s
not
in
[
init_traj
[
0
],
init_traj
[
-
1
]]]
# Test point should have lenght 1
test_point
=
test_points
[
0
]
for
state
in
states
:
if
state
(
test_point
):
try
:
tmp
=
self
[
key
]
except
KeyError
:
self
[
key
]
=
{
state
:
[]}
try
:
self
[
key
][
state
].
append
(
trial_traj
)
except
KeyError
:
self
[
key
][
state
]
=
[
trial_traj
]
class
GenTrajVelocities
(
PathSimulator
):
calc_name
=
"genTrajVelocities"
def
__init__
(
self
,
trajectory
=
None
,
StateA
=
None
,
StateB
=
None
,
engine
=
None
,
storage
=
None
,
n_per_snap
=
10
,
temperature
=
310
*
unit
.
kelvin
,
n_min
=
1
,
snap_frame
=
None
,
pB_min
=
0
,
pB_max
=
1
):
# TODO: Add loggers
# TODO: Add documentation
super
(
PathSimulator
,
self
).
__init__
()
self
.
trajectory
=
trajectory
self
.
StateA
=
StateA
self
.
StateB
=
StateB
self
.
engine
=
engine
self
.
n_per_snap
=
n_per_snap
self
.
subtrajectories
=
[]
self
.
storage
=
storage
self
.
n_min
=
n_min
self
.
output
=
sys
.
stdout
self
.
snap_frame
=
snap_frame
self
.
pB_min
=
pB_min
self
.
pB_max
=
pB_max
if
isinstance
(
engine
,
paths
.
engines
.
toy
.
Engine
):
try
:
temperature
.
unit
except
AttributeError
:
kb
=
1
else
:
kb
=
unit
.
BOLTZMANN_CONSTANT_kB
elif
isinstance
(
engine
,
paths
.
engines
.
openmm
.
engine
.
OpenMMEngine
):
try
:
temperature
.
unit
except
AttributeError
:
temperature
*=
unit
.
kelvin
kb
=
unit
.
BOLTZMANN_CONSTANT_kB
else
:
raise
NotImplementedError
(
"Only implemented for the toy and "
+
"OpenMM engines."
)
self
.
temperature
=
temperature
# Regular calculation for beta if masses is in dalton (OpenMM Standard)
self
.
beta
=
1.0
/
(
temperature
*
kb
)
#self.beta = 1.0
# If using toysnapshots (without masses) this is needed for randomizer
# to work.
#try:
# trajectory[0].masses[0].unit
#except AttributeError:
# self.beta *= unit.dalton
def
get_subtrajectories
(
self
):
# TODO: Fancy subtrajectory class
network
=
paths
.
TPSNetwork
.
from_state_pairs
([(
self
.
StateA
,
self
.
StateB
)])
subtrajectories
=
[]
ensembles
=
network
.
analysis_ensembles
[
0
]
for
subtrajectory
in
ensembles
.
split
(
self
.
trajectory
):
subtrajectories
.
append
(
subtrajectory
)
if
len
(
subtrajectories
)
==
0
:
for
subtrajectory
in
ensembles
.
split
(
self
.
trajectory
[::
-
1
]):
subtrajectories
.
append
(
subtrajectory
)
if
len
(
subtrajectories
)
==
0
:
raise
InputError
(
"Input trajectory has no transition from StateA to
\
StateB"
)
self
.
subtrajectories
=
subtrajectories
return
subtrajectories
def
next_frame
(
self
,
n_stateA
=
0.0
,
n_stateB
=
0.0
,
pB_min
=
0
,
pB_max
=
1
,
snap_min
=
''
,
snap_frame
=
''
,
snap_max
=
''
):
if
snap_min
==
''
:
snap_min
=
self
.
snap_min
if
snap_frame
==
''
:
snap_frame
=
self
.
snap_frame
if
snap_max
==
''
:
snap_max
=
self
.
snap_max
n_stateA
=
float
(
n_stateA
)
n_stateB
=
float
(
n_stateB
)
# TODO: write docs
#print n_stateA, n_stateB
pB
=
n_stateB
/
(
n_stateA
+
n_stateB
)
self
.
output
.
write
(
"Committor is: "
+
str
(
pB
)
+
"
\n
"
)
self
.
output
.
flush
()
if
pB_min
<
pB
<
pB_max
:
self
.
snap_frame
=
None
return
if
pB
<=
pB_min
:
snap_min
=
snap_frame
if
pB
>=
pB_max
:
snap_max
=
snap_frame
snap_frame
=
snap_min
+
((
snap_max
-
snap_min
)
/
2
)
if
snap_frame
<=
snap_min
or
snap_frame
>=
snap_max
:
if
pB_min
>=
pB_max
:
raise
ValueError
(
"pB_max should be higher than pB_min"
)
else
:
raise
RuntimeError
(
"Shooting frame is "
+
"outside of shooting range, you might want "
+
"to increase n_per_snap."
)
else
:
self
.
snap_frame
=
snap_frame
self
.
snap_min
=
snap_min
self
.
snap_max
=
snap_max
def
run
(
self
):
# TODO: some fancy switching if more than one transition is present
randomizer
=
paths
.
RandomVelocities
(
beta
=
self
.
beta
)
subtrajectory
=
self
.
get_subtrajectories
()[
0
]
self
.
output
.
write
(
"Lenght of the input trajectory is: "
+
str
(
len
(
self
.
trajectory
))
+
"
\n
"
)
self
.
output
.
write
(
"Length of the first subtrajectory: "
+
str
(
len
(
subtrajectory
))
+
"
\n
"
)
self
.
snap_min
=
0
self
.
snap_max
=
len
(
subtrajectory
)
-
1
StateA
=
self
.
StateA
StateB
=
self
.
StateB
if
self
.
snap_frame
==
None
:
self
.
snap_frame
=
((
self
.
snap_max
-
self
.
snap_min
)
/
2
)
while
self
.
snap_frame
is
not
None
:
# pragma: no cover
self
.
output
.
write
(
"Shooting from frame:"
+
str
(
self
.
snap_frame
)
+
"
\n
"
)
self
.
output
.
flush
()
snap
=
subtrajectory
[
self
.
snap_frame
]
simulation
=
paths
.
CommittorSimulation
(
storage
=
self
.
storage
,
engine
=
self
.
engine
,
states
=
[
StateA
,
StateB
],
randomizer
=
randomizer
,
direction
=
2
,
initial_snapshots
=
[
snap
]
)
simulation
.
run
(
n_per_snapshot
=
self
.
n_per_snap
)
try
:
results
=
paths
.
ShootingPointAnalysis
(
steps
=
self
.
storage
.
steps
,
states
=
[
StateA
,
StateB
]
)
except
AssertionError
:
raise
RuntimeError
(
"None of the tries entered a state, you
\
might need to increase n_frames_max of the
\
engine."
)
result
=
results
[
snap
]
self
.
next_frame
(
n_stateA
=
result
[
StateA
],
n_stateB
=
result
[
StateB
],
pB_min
=
self
.
pB_min
,
pB_max
=
self
.
pB_max
)
self
.
output
.
write
(
"Found transition
\n
"
)
self
.
output
.
flush
()
traj_dict
=
getTrajectoriesPerShootingPoint
(
steps
=
self
.
storage
.
steps
,
states
=
[
StateA
,
StateB
])
trajA
=
traj_dict
[
snap
][
StateA
][
0
]
trajB
=
traj_dict
[
snap
][
StateB
][
0
]
return
trajA
[::
-
1
]
+
trajB
[
1
:]
ops_tse/__init__.py
0 → 100644
View file @
42c738b6
from
transitionstate_ensemble
import
TransitionStateEnsemble
ops_tse/__init__.pyc
0 → 100644
View file @
42c738b6
File added
ops_
TSE
/logging.conf
→
ops_
tse
/logging.conf
View file @
42c738b6
File moved
ops_
TSE
/__init__.py
→
ops_
tse/tests
/__init__.py
View file @
42c738b6
File moved
ops_tse/transitionstate_ensemble.py
0 → 100644
View file @
42c738b6
#!/usr/bin/env python2
from
openpathsampling.pathsimulator
import
PathSimulator
import
openpathsampling
as
paths
#import logging.config
#logging.config.fileConfig("./logging.conf", disable_existing_loggers=False)
class
TransitionStateEnsemble
(
PathSimulator
):
calc_name
=
"TransitionStateEnsemble"
def
__init__
(
self
,
trajectories
,
StateA
,
StateB
,
engine
,
storage
,
randomizer
,
n_per_snap
=
20
,
pB_min
=
0.4
,
pB_max
=
0.6
):
# TODO: Add loggers
# TODO: Add documentation
super
(
TransitionStateEnsemble
,
self
).
__init__
(
storage
)
self
.
trajectories
=
trajectories
self
.
StateA
=
StateA
self
.
StateB
=
StateB
self
.
engine
=
engine
self
.
n_per_snap
=
n_per_snap
self
.
storage
=
storage
self
.
pB_min
=
pB_min
self
.
pB_max
=
pB_max
self
.
pB
=
0.0
self
.
snap_frame
=
None
self
.
randomizer
=
randomizer
self
.
transition_snapshots
=
{}
if
pB_min
>=
pB_max
:
raise
ValueError
(
"pB_min should be smaller than pB_max.
\n
"
+
"pB_min: "
+
str
(
pB_min
)
+
"
\n
"
+
"pB_max: "
+
str
(
pB_max
))
def
next_frame
(
self
,
n_stateA
=
0.0
,
n_stateB
=
0.0
,
pB_min
=
0
,
pB_max
=
1
,
snap_min
=
''
,
snap_frame
=
''
,
snap_max
=
''
):
if
snap_min
==
''
:
snap_min
=
self
.
snap_min
if
snap_frame
==
''
:
snap_frame
=
self
.
snap_frame
if
snap_max
==
''
:
snap_max
=
self
.
snap_max
n_stateA
=
float
(
n_stateA
)
n_stateB
=
float
(
n_stateB
)
self
.
pB
=
n_stateB
/
(
n_stateA
+
n_stateB
)
self
.
output_stream
.
write
(
"
\n
Committor is: "
+
str
(
self
.
pB
)
+
"
\n\n
"
)
self
.
output_stream
.
flush
()
if
pB_min
<=
self
.
pB
<=
pB_max
:
self
.
snap_frame
=
None
return
if
self
.
pB
<
pB_min
:
snap_min
=
snap_frame
if
self
.
pB
>
pB_max
:
snap_max
=
snap_frame
snap_frame
=
snap_min
+
((
snap_max
-
snap_min
)
/
2
)
if
snap_frame
<=
snap_min
or
snap_frame
>=
snap_max
:
if
pB_min
>=
pB_max
:
raise
ValueError
(
"pB_max should be higher than pB_min"
)
else
:
raise
RuntimeError
(
"Shooting frame is outside of shooting "
+
"range, you might want to increase "
+
"'n_per_snap'.
\n
"
)
else
:
self
.
snap_frame
=
snap_frame
self
.
snap_min
=
snap_min
self
.
snap_max
=
snap_max
def
run
(
self
):
randomizer
=
self
.
randomizer
for
trajectory
in
self
.
trajectories
:
self
.
snap_min
=
0
self
.
snap_max
=
len
(
trajectory
)
-
1
StateA
=
self
.
StateA
StateB
=
self
.
StateB
if
self
.
snap_frame
is
None
:
self
.
snap_frame
=
((
self
.
snap_max
-
self
.
snap_min
)
/
2
)
while
self
.
snap_frame
is
not
None
:
self
.
output_stream
.
write
(
"Shooting from frame:"
+
str
(
self
.
snap_frame
)
+
"
\n
"
)
self
.
output_stream
.
flush
()
snap
=
trajectory
[
self
.
snap_frame
]
simulation
=
paths
.
CommittorSimulation
(
storage
=
self
.
storage
,
engine
=
self
.
engine
,
states
=
[
StateA
,
StateB
],
randomizer
=
randomizer
,
direction
=
None
,
initial_snapshots
=
[
snap
]
)
simulation
.
run
(
n_per_snapshot
=
self
.
n_per_snap
)
try
:
results
=
paths
.
ShootingPointAnalysis
(
steps
=
self
.
storage
.
steps
[
-
self
.
n_per_snap
:],
states
=
[
StateA
,
StateB
]
)
except
AssertionError
:
raise
RuntimeError
(
"None of the tries entered a state, you
\
might need to increase n_frames_max of the
\
engine."
)
result
=
results
[
snap
]
self
.
next_frame
(
n_stateA
=
result
[
StateA
],
n_stateB
=
result
[
StateB
],
pB_min
=
self
.
pB_min
,
pB_max
=
self
.
pB_max
)
self
.
transition_snapshots
[
snap
]
=
self
.
pB
return
self
.
transition_snapshots
ops_tse/transitionstate_ensemble.pyc
0 → 100644
View file @
42c738b6
File added
ops_tse/version.py
0 → 100644
View file @
42c738b6
# This file is automatically generated by setup.py
short_version
=
'0.1.0'
version
=
'0.1.0'
full_version
=
'0.1.0.dev-f4093b4'
git_revision
=
'f4093b4f06c2e0f09f44caf692209f41ca83387a'
release
=
False
if
not
release
:
version
=
full_version
setup.py
View file @
42c738b6
...
...
@@ -47,7 +47,7 @@ def git_version():
return
GIT_REVISION
def
write_version_py
(
filename
=
'ops_
piggybacker
/version.py'
):
def
write_version_py
(
filename
=
'ops_
tse
/version.py'
):
cnt
=
"""
# This file is automatically generated by setup.py
short_version = '%(version)s'
...
...
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment