1+ from typing import Any
2+ import gymnasium as gym
3+ import rcs
4+ import numpy as np
5+
6+ from rcs ._core .sim import SimConfig
7+ from rcs .envs .base import ControlMode , RelativeActionSpace
8+ from rcs .envs .space_utils import ActObsInfoWrapper
9+ from rcs .sim import SimGripperConfig
10+ from rcs_tacto .tacto_wrapper import TactoSimWrapper
11+ from gymnasium .envs .registration import EnvCreator
12+
13+
14+ from rcs ._core .sim import CameraType , SimCameraConfig
15+ from rcs .envs .creators import SimTaskEnvCreator
16+
17+ SCENE_FILE = "beast_refiner/rcs_env/rcs_icra_scene_digit/scene.xml"
18+ CAMERAS = [
19+ "wrist" ,
20+ "bird_eye" ,
21+ "side" ,
22+ "side_right" ,
23+ "side_wide" ,
24+ ]
25+
26+ class AlignSpaceWrapper (ActObsInfoWrapper ):
27+
28+ def __init__ (self , env ):
29+ super ().__init__ (env )
30+ self .observation_space = gym .spaces .Dict ({
31+ "state" : gym .spaces .Box (low = - np .inf , high = np .inf , shape = (15 ,), dtype = np .float32 ),
32+ "images.image" : env .observation_space ["frames" ]["side" ]["rgb" ]["data" ],
33+ "images.image2" : env .observation_space ["frames" ]["wrist" ]["rgb" ]["data" ],
34+ "images.tactile_left" : gym .spaces .Box (low = 0 , high = 255 , shape = (320 , 240 , 3 ), dtype = np .uint8 ),
35+ "images.tactile_right" : gym .spaces .Box (low = 0 , high = 255 , shape = (320 , 240 , 3 ), dtype = np .uint8 ),
36+ })
37+ # self.action_space = env.action_space["joints"]
38+ gym .spaces .Box (low = - np .inf , high = np .inf , shape = (8 ,), dtype = np .float32 )
39+
40+ def observation (self , observation : dict [str , Any ], info : dict [str , Any ]) -> tuple [dict [str , Any ], dict [str , Any ]]:
41+ observation = {
42+ "state" : np .concat ([observation ["joints" ], observation ["gripper" ], info ["tau_ext" ]]),
43+ "images.image" : observation ["frames" ]["side" ]["rgb" ]["data" ],
44+ "images.image2" : observation ["frames" ]["wrist" ]["rgb" ]["data" ],
45+ "images.tactile_left" : observation ["frames" ]["tactile_left_tacto_pad_0" ]["rgb" ]["data" ],
46+ "images.tactile_right" : observation ["frames" ]["tactile_right_tacto_pad_0" ]["rgb" ]["data" ],
47+ }
48+ # tactile and tau_ext are still unsolved
49+
50+ return observation , info
51+
52+
53+ def action (self , action : np .ndarray ) -> dict [str , Any ]:
54+ action = {"joints" : action [:7 ], "gripper" : action [7 ]}
55+ return action
56+
57+
58+ def get_robot_cfg ():
59+ robot_cfg = rcs .sim .SimRobotConfig ()
60+ robot_cfg .tcp_offset = rcs .common .Pose (
61+ translation = np .array ([0.0 , 0.0 , 0.15 ]),
62+ rotation = np .array ([[0.707 , 0.707 , 0 ], [- 0.707 , 0.707 , 0 ], [0 , 0 , 1 ]]),
63+ )
64+ robot_cfg .robot_type = rcs .common .RobotType .FR3
65+ robot_cfg .add_id ("0" )
66+ robot_cfg .mjcf_scene_path = str (SCENE_FILE )
67+ robot_cfg .kinematic_model_path = rcs .scenes ["fr3_empty_world" ].mjcf_robot
68+ return robot_cfg
69+
70+
71+ def make_single_env ():
72+ robot_cfg = get_robot_cfg ()
73+ random_pos_args = {
74+ "joint_name" : "box_joint" ,
75+ "init_object_pose" : rcs .common .Pose (
76+ translation = np .array ([0.048 , 0 , 0.855 ]),
77+ quaternion = np .array ([0.0 , 0.0 , 1 , 0 ]),
78+ ),
79+ "include_rotation" : True ,
80+ "x_scale" : 0.3 ,
81+ "y_scale" : 0.4 ,
82+ "x_offset" : - 0.1 ,
83+ "y_offset" : - 0.2 ,
84+ }
85+ gripper_cfg = SimGripperConfig ()
86+ gripper_cfg .collision_geoms = []
87+ gripper_cfg .collision_geoms_fingers = []
88+ gripper_cfg .add_id ("0" )
89+
90+ sim_cfg = SimConfig ()
91+ sim_cfg .realtime = False
92+ sim_cfg .async_control = True
93+ sim_cfg .frequency = 30 # in Hz
94+
95+ resolution = (256 , 256 )
96+ cameras = {
97+ cam : SimCameraConfig (
98+ identifier = cam ,
99+ type = CameraType .fixed ,
100+ resolution_height = resolution [1 ],
101+ resolution_width = resolution [0 ],
102+ frame_rate = 0 , # this means render on demand
103+ )
104+ for cam in CAMERAS
105+ }
106+
107+ env_factory = SimTaskEnvCreator ()
108+ env = env_factory (
109+ robot_cfg = robot_cfg ,
110+ control_mode = ControlMode .JOINTS ,
111+ render_mode = "" , # put this to 'human' to open a gui to watch the simulation
112+ delta_actions = False ,
113+ cameras = cameras ,
114+ random_pos_args = random_pos_args ,
115+ gripper_cfg = gripper_cfg ,
116+ sim_cfg = sim_cfg ,
117+ )
118+ env = RelativeActionSpace (env , max_mov = np .deg2rad (90 ))
119+ env = TactoSimWrapper (
120+ env ,
121+ tacto_sites = ["left_tacto_pad_0" , "right_tacto_pad_0" ],
122+ tacto_geoms = ["box_geom" ],
123+ tacto_fps = 30 ,
124+ enable_depth = False ,
125+ visualize = False ,
126+ )
127+ env = AlignSpaceWrapper (env )
128+ return env
129+
130+
131+ def make_env (n_envs : int = 1 , use_async_envs : bool = False ):
132+ """
133+ Create vectorized environments for your custom task.
134+
135+ Args:
136+ n_envs: Number of parallel environments
137+ use_async_envs: Whether to use AsyncVectorEnv or SyncVectorEnv
138+
139+ Returns:
140+ gym.vector.VectorEnv or dict mapping suite names to vectorized envs
141+ """
142+
143+ # Choose vector environment type
144+ env_cls = gym .vector .AsyncVectorEnv if use_async_envs else gym .vector .SyncVectorEnv
145+
146+ # Create vectorized environment
147+ vec_env = env_cls ([make_single_env for _ in range (n_envs )])
148+
149+ return vec_env
150+
151+
152+ class Pick (EnvCreator ):
153+ def __call__ (self ) -> gym .Env :
154+ return make_single_env ()
0 commit comments