MAZALA2024 commited on
Commit
be069c2
·
verified ·
1 Parent(s): db88504

Update rvc_service.py

Browse files
Files changed (1) hide show
  1. rvc_service.py +196 -73
rvc_service.py CHANGED
@@ -1,90 +1,213 @@
1
- import gradio as gr
2
- from rvc_service import RVCService
3
- import asyncio
4
- import logging
5
  import numpy as np
6
- from scipy.io import wavfile
 
 
 
 
 
 
 
7
  import os
 
 
8
 
9
- # Initialize logging
10
- logging.basicConfig(level=logging.DEBUG)
 
 
 
11
  logger = logging.getLogger(__name__)
12
 
13
- # Initialize RVC Service
14
- rvc_service = RVCService()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
15
 
16
- async def convert_tts(model_name, audio_file, slang_rate):
17
- try:
18
- logger.debug(f"Received request - model: {model_name}, audio: {type(audio_file)}, slang: {slang_rate}")
 
 
 
 
 
19
 
20
- if audio_file is None:
21
- logger.error("No audio file provided")
22
- return {"error": "No audio file uploaded."}, None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
23
 
24
- # Load and preprocess audio
25
- if hasattr(audio_file, 'name'):
26
- logger.debug(f"Audio file name: {audio_file.name}")
27
- # Load audio file
28
- sr, audio = wavfile.read(audio_file.name)
29
- # Convert to mono if stereo
30
- if len(audio.shape) > 1:
31
- audio = np.mean(audio, axis=1)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
32
 
33
- # Submit job to RVC service
34
- job_id = await rvc_service.submit_job(
35
- audio_data=audio,
36
- model_name=model_name,
37
- priority=1 # Default priority
38
- )
39
 
40
- if not job_id:
41
- return {"error": "Service queue is full"}, None
 
 
 
 
 
 
 
42
 
43
- # Wait for result (you might want to implement a better waiting mechanism)
44
- for _ in range(30): # Maximum 30 seconds wait
45
- if job_id in rvc_service.job_queue.processing:
46
- await asyncio.sleep(1)
47
  continue
48
 
49
- # Check if job completed successfully
50
- output_path = f"outputs/output_{job_id}.wav"
51
- if os.path.exists(output_path):
52
- return {"info": f"Processed with job ID: {job_id}"}, output_path
 
 
 
 
 
 
 
53
 
54
- return {"error": "Processing timeout"}, None
 
 
 
 
 
 
 
 
 
55
 
56
- return {"error": "Invalid audio file"}, None
57
-
58
- except Exception as e:
59
- logger.error(f"Error in convert_tts: {str(e)}", exc_info=True)
60
- return {"error": str(e)}, None
61
-
62
- # Modified interface with queue settings for better concurrency
63
- iface = gr.Interface(
64
- fn=convert_tts,
65
- inputs=[
66
- gr.Dropdown(choices=get_model_names(), label="Model", interactive=True),
67
- gr.Audio(label="Upload Audio", type="filepath"),
68
- gr.Slider(minimum=0, maximum=1, step=0.01, label="Slang Rate"),
69
- ],
70
- outputs=[
71
- gr.JSON(label="Info"),
72
- gr.Audio(label="Converted Audio")
73
- ],
74
- title="Voice Conversion",
75
- concurrency_limit=5, # Limit concurrent requests
76
- batch=False, # Process requests individually
77
- max_batch_size=1,
78
- ).queue()
79
 
80
- if __name__ == "__main__":
81
- # Start RVC service
82
- rvc_service.start()
 
 
83
 
84
- # Launch Gradio interface
85
- iface.launch(
86
- debug=True,
87
- show_error=True,
88
- max_threads=10,
89
- share=False
90
- )
 
1
+ import torch
 
 
 
2
  import numpy as np
3
+ import logging
4
+ import queue
5
+ import threading
6
+ import time
7
+ from dataclasses import dataclass
8
+ from typing import Optional, Dict, List
9
+ import gc
10
+ from datetime import datetime, time as dt_time
11
  import os
12
+ from collections import deque
13
+ import asyncio
14
 
15
+ # Configure logging
16
+ logging.basicConfig(
17
+ level=logging.INFO,
18
+ format='%(asctime)s - %(levelname)s - %(message)s'
19
+ )
20
  logger = logging.getLogger(__name__)
21
 
22
+ @dataclass
23
+ class JobRequest:
24
+ """Represents a single voice conversion request"""
25
+ id: str # Unique identifier for the job
26
+ audio_data: np.ndarray # Input audio data
27
+ model_name: str # Name of the RVC model to use
28
+ priority: int = 1 # Priority level (1-5, 5 being highest)
29
+ timestamp: float = None # When the job was submitted
30
+
31
+ def __post_init__(self):
32
+ self.timestamp = time.time() if self.timestamp is None else self.timestamp
33
+
34
+ class ModelCache:
35
+ """Manages cached models with LRU eviction policy"""
36
+ def __init__(self, max_models: int = 3):
37
+ self.max_models = max_models
38
+ self.models: Dict[str, torch.nn.Module] = {}
39
+ self.model_usage: deque = deque()
40
+ self.lock = threading.Lock()
41
+
42
+ def get_model(self, model_name: str) -> Optional[torch.nn.Module]:
43
+ """Get model from cache, implementing LRU policy"""
44
+ with self.lock:
45
+ if model_name in self.models:
46
+ # Update usage history
47
+ self.model_usage.remove(model_name)
48
+ self.model_usage.append(model_name)
49
+ return self.models[model_name]
50
+ return None
51
+
52
+ def add_model(self, model_name: str, model: torch.nn.Module):
53
+ """Add model to cache, evicting least recently used if necessary"""
54
+ with self.lock:
55
+ if len(self.models) >= self.max_models:
56
+ # Evict least recently used model
57
+ lru_model = self.model_usage.popleft()
58
+ del self.models[lru_model]
59
+ # Force garbage collection to free GPU memory
60
+ gc.collect()
61
+ torch.cuda.empty_cache()
62
+
63
+ self.models[model_name] = model
64
+ self.model_usage.append(model_name)
65
 
66
+ class JobQueue:
67
+ """Manages prioritized job queue with rate limiting"""
68
+ def __init__(self, max_size: int = 100):
69
+ self.queue = queue.PriorityQueue(maxsize=max_size)
70
+ self.processing: Dict[str, JobRequest] = {}
71
+ self.lock = threading.Lock()
72
+ self.last_processed = time.time()
73
+ self.rate_limit = 1.0 # Minimum seconds between jobs
74
 
75
+ def add_job(self, job: JobRequest) -> bool:
76
+ """Add job to queue with priority"""
77
+ try:
78
+ # Priority tuple: (priority reversed, timestamp, job)
79
+ # Lower number = higher priority
80
+ self.queue.put((6 - job.priority, job.timestamp, job), block=False)
81
+ logger.info(f"Added job {job.id} to queue. Priority: {job.priority}")
82
+ return True
83
+ except queue.Full:
84
+ logger.warning("Queue is full, job rejected")
85
+ return False
86
+
87
+ def get_next_job(self) -> Optional[JobRequest]:
88
+ """Get next job respecting rate limiting"""
89
+ if time.time() - self.last_processed < self.rate_limit:
90
+ return None
91
+
92
+ try:
93
+ _, _, job = self.queue.get(block=False)
94
+ with self.lock:
95
+ self.processing[job.id] = job
96
+ self.last_processed = time.time()
97
+ return job
98
+ except queue.Empty:
99
+ return None
100
 
101
+ class RVCService:
102
+ """Main service class for RVC processing"""
103
+ def __init__(self):
104
+ self.model_cache = ModelCache(max_models=3)
105
+ self.job_queue = JobQueue(max_size=100)
106
+ self.is_running = False
107
+ self.worker_thread = None
108
+
109
+ # Operating hours (24-hour format)
110
+ self.start_time = dt_time(9, 0) # 9:00 AM
111
+ self.end_time = dt_time(0, 0) # 12:00 AM
112
+
113
+ def within_operating_hours(self) -> bool:
114
+ """Check if current time is within operating hours"""
115
+ current_time = datetime.now().time()
116
+ if self.start_time <= self.end_time:
117
+ return self.start_time <= current_time <= self.end_time
118
+ else: # Handles overnight operation (e.g., 9 AM to 12 AM)
119
+ return current_time >= self.start_time or current_time <= self.end_time
120
+
121
+ async def process_audio(self, job: JobRequest) -> Optional[np.ndarray]:
122
+ """Process a single audio conversion job"""
123
+ try:
124
+ # Get or load model
125
+ model = self.model_cache.get_model(job.model_name)
126
+ if model is None:
127
+ logger.info(f"Loading model {job.model_name}")
128
+ # Here you would load your RVC model
129
+ # model = load_rvc_model(job.model_name)
130
+ self.model_cache.add_model(job.model_name, model)
131
+
132
+ # Process audio
133
+ with torch.cuda.amp.autocast():
134
+ # Your RVC processing logic here
135
+ # output = model.convert_voice(job.audio_data)
136
+ output = job.audio_data # Placeholder
137
+
138
+ return output
139
 
140
+ except Exception as e:
141
+ logger.error(f"Error processing job {job.id}: {str(e)}")
142
+ return None
 
 
 
143
 
144
+ async def worker_loop(self):
145
+ """Main worker loop processing jobs from queue"""
146
+ while self.is_running:
147
+ try:
148
+ # Check operating hours
149
+ if not self.within_operating_hours():
150
+ logger.info("Outside operating hours, worker sleeping...")
151
+ await asyncio.sleep(300) # Check every 5 minutes
152
+ continue
153
 
154
+ # Get next job
155
+ job = self.job_queue.get_next_job()
156
+ if job is None:
157
+ await asyncio.sleep(0.1) # Prevent busy waiting
158
  continue
159
 
160
+ logger.info(f"Processing job {job.id}")
161
+ output = await self.process_audio(job)
162
+
163
+ if output is not None:
164
+ logger.info(f"Successfully processed job {job.id}")
165
+ else:
166
+ logger.error(f"Failed to process job {job.id}")
167
+
168
+ # Cleanup
169
+ with self.job_queue.lock:
170
+ self.job_queue.processing.pop(job.id, None)
171
 
172
+ except Exception as e:
173
+ logger.error(f"Worker error: {str(e)}")
174
+ await asyncio.sleep(1) # Prevent rapid error loops
175
+
176
+ def start(self):
177
+ """Start the service"""
178
+ if not self.is_running:
179
+ self.is_running = True
180
+ asyncio.create_task(self.worker_loop())
181
+ logger.info("RVC Service started")
182
 
183
+ def stop(self):
184
+ """Stop the service"""
185
+ self.is_running = False
186
+ logger.info("RVC Service stopping...")
187
+
188
+ async def submit_job(self, audio_data: np.ndarray, model_name: str, priority: int = 1) -> str:
189
+ """Submit a new job to the service"""
190
+ job_id = f"job_{int(time.time())}_{id(audio_data)}"
191
+ job = JobRequest(
192
+ id=job_id,
193
+ audio_data=audio_data,
194
+ model_name=model_name,
195
+ priority=priority
196
+ )
197
+
198
+ if self.job_queue.add_job(job):
199
+ return job_id
200
+ return None
 
 
 
 
 
201
 
202
+ # Memory management utilities
203
+ def cleanup_gpu_memory():
204
+ """Force cleanup of GPU memory"""
205
+ gc.collect()
206
+ torch.cuda.empty_cache()
207
 
208
+ def monitor_gpu_memory():
209
+ """Log GPU memory usage"""
210
+ if torch.cuda.is_available():
211
+ allocated = torch.cuda.memory_allocated() / 1024**2
212
+ reserved = torch.cuda.memory_reserved() / 1024**2
213
+ logger.info(f"GPU Memory: {allocated:.2f}MB allocated, {reserved:.2f}MB reserved")