# Stability AI ```elixir Mix.install([ {:tesla, "~> 1.4"}, # optional, but recommended adapter {:hackney, "~> 1.17"}, # optional, required by JSON middleware {:jason, ">= 1.0.0"}, {:kino, "~> 0.12.0"} ]) Kino.nothing() ``` ## Generate images with Stable Diffusion ```elixir Kino.Markdown.new(""" Stability AI provides their Stability Diffusion family of image generation models. This application allows you to interface with Stability AI's API:s to generate images with your choice of models. *To be able to generate images you need your Stability AI API-key. When you provide the key it will be used with every request to Stability AI but otherwise will be private to your session - meaning you will be asked again for your API-key if you were to close this session or if it were timed out.* """) ``` ```elixir #Kino.FS.file_path("giraffe-explosion.webp") #|> File.read!() #|> Kino.Image.new("image/webp") #|> Kino.render() # #Kino.Markdown.new( # "> Example of image generated with Stable Diffusion 3: `A running giraffe, behind it an explosion, all surrounded by the vastness of space and a galaxy.` " #) Kino.nothing() ``` ```elixir defmodule StabilityAI do use Tesla @sd3_uri "/v2beta/stable-image/generate/sd3" @models %{ sd1_6: "stable-diffusion-v1-6", sdxl: "stable-diffusion-xl-1024-v1-0", sd3: "sd3", sd3_turbo: "sd3-turbo" } @sd_uris %{ sd1_6: "/v1/generation/stable-diffusion-v1-6/text-to-image", sdxl: "/v1/generation/stable-diffusion-xl-1024-v1-0/text-to-image", sd3: "/v2beta/stable-image/generate/sd3", sd3_turbo: "/v2beta/stable-image/generate/sd3" } @base_url "https://api.stability.ai" def image_path() do Kino.tmp_dir() <> "/image_generations/" end def get_image(model_prompt, model, api_key) do case model do :sd1_6 -> get_image_sd1(model_prompt, model, api_key) :sdxl -> get_image_sd1(model_prompt, model, api_key) :sd3 -> get_image_sd3(model_prompt, model, api_key) :sd3_turbo -> get_image_sd3(model_prompt, model, api_key) end end defp get_image_sd1( %{ prompt: prompt, negative_prompt: _negative_prompt, aspect_ratio: aspect_ratio, style_preset: style_preset }, model, api_key ) when byte_size(prompt) > 5 and byte_size(aspect_ratio) >= 3 do client = Tesla.client([ {Tesla.Middleware.BaseUrl, @base_url}, Tesla.Middleware.JSON, {Tesla.Middleware.Headers, [ {"Authorization", "Bearer #{api_key}"}, {"Accept", "image/png"}, {"Stability-Client-ID", "Livebook Stability"}, {"Stability-Client-Version", "0.1"} ]} ]) uri = Map.get(@sd_uris, model) {width, height} = case aspect_ratio do "21:9" -> {1536, 640} "16:9" -> {1344, 768} "3:2" -> {1216, 832} "1:1" -> {1024, 1024} "2:3" -> {832, 1216} "9:16" -> {768, 1344} "9:21" -> {640, 1536} end body = %{ text_prompts: [%{text: prompt, weight: 0.5}], height: height, width: width, cfg_scale: 7, clip_guidance_preset: "NONE", samples: 1, seed: 0, steps: 30 } body = case style_preset do nil -> body _ -> Map.put(body, :style_preset, style_preset) end post( client, uri, body ) end defp get_image_sd3( %{ prompt: prompt, negative_prompt: negative_prompt, aspect_ratio: aspect_ratio }, model, api_key ) when byte_size(prompt) > 5 and byte_size(aspect_ratio) >= 3 do client = Tesla.client([ {Tesla.Middleware.BaseUrl, @base_url}, Tesla.Middleware.FormUrlencoded ]) body_template = Tesla.Multipart.new() |> Tesla.Multipart.add_field("prompt", prompt) |> Tesla.Multipart.add_field("output_format", "png") |> Tesla.Multipart.add_field("aspect_ratio", aspect_ratio) |> Tesla.Multipart.add_field("model", Map.get(@models, model)) body = case byte_size(negative_prompt) do 0 -> body_template _ -> body_template |> Tesla.Multipart.add_field("negative_prompt", negative_prompt) end post( client, @sd3_uri, body, headers: [ {"Authorization", "Bearer #{api_key}"}, {"Accept", "image/*"} ] ) end def get_balance(api_key) do client = Tesla.client([ {Tesla.Middleware.BaseUrl, @base_url}, Tesla.Middleware.JSON, {Tesla.Middleware.Headers, [ {"Authorization", "Bearer #{api_key}"}, {"Accept", "application/json"}, {"Stability-Client-ID", "Livebook Stability"}, {"Stability-Client-Version", "0.1"} ]} ]) get( client, "/v1/user/balance" ) end def throttled_submission(form, data, status_frame: stability_status) do case RateLimitedForm.rate_limit(form) do :ok -> IO.puts("Generating with Stability AI...") Kino.Frame.render(stability_status, Kino.Markdown.new("Loading...")) StabilityAI.get_image(data, ModelVault.value(), KeyVault.value()) :halt -> IO.puts("Please wait a few seconds before submitting again") nil end end def handle_response( response, data, status_frame: stability_status, preview_frame: preview_frame, output_frame: output_frame, balance_frame: balance_frame ) do case response do {:ok, %Tesla.Env{status: 200, body: body}} -> filename_template = :crypto.hash(:sha, "#{data.prompt} - #{:os.system_time(:second)}") |> Base.encode16(case: :lower) filename = filename_template <> ".png" unless File.exists?(image_path()) do File.mkdir!(image_path()) end filepath = image_path() <> filename File.write!(filepath, body) IO.puts("Written file to '#{filepath}'") content = File.read!(filepath) image = Kino.Image.new(content, "image/png") ImagesVault.store(filepath) Kino.Frame.render(stability_status, Kino.Markdown.new("Done!")) Kino.Frame.render(preview_frame, image) images = for image_path <- ImagesVault.value() do File.read!(image_path) |> Kino.Image.new(:png) end images_grid = Kino.Layout.grid(images, columns: 3) Kino.Frame.render(output_frame, images_grid) case StabilityAI.get_balance(KeyVault.value()) do {:ok, %Tesla.Env{status: 200, body: body}} -> %{"credits" => credits} = body Kino.Frame.render(balance_frame, Kino.Markdown.new("**Balance** `#{credits}`")) _ -> :error end {:ok, filepath} _ -> Kino.Frame.render(stability_status, Kino.Markdown.new("Something went wrong!")) IO.inspect(response) {:error, response} end end end defmodule RateLimitedForm do use GenServer def start_link(form, opts) do GenServer.start_link(__MODULE__, {form, opts}, name: __MODULE__) end def init({form, _opts}) do {:ok, %{form: form, last_submitted: nil}} end def rate_limit(form) do GenServer.call(__MODULE__, {:rate_limit, form}) end def handle_call({:rate_limit, _form}, _from, state) do current_time = :os.system_time(:second) case state.last_submitted && current_time - state.last_submitted < 5 do true -> {:reply, :halt, state} _ -> state = %{state | last_submitted: current_time} {:reply, :ok, state} end end end defmodule KeyVault do use Agent def start_link(initial_value) do Agent.start_link(fn -> initial_value end, name: __MODULE__) end def value do Agent.get(__MODULE__, & &1) end def store(value) do Agent.update(__MODULE__, fn _ -> value end) end end defmodule ImagesVault do use Agent def start_link(initial_value) do Agent.start_link(fn -> initial_value end, name: __MODULE__) end def value do Agent.get(__MODULE__, & &1) end def latest do Agent.get(__MODULE__, fn images -> List.first(images) end) end def store(image) do Agent.update(__MODULE__, fn images -> [image | images] end) end end defmodule ModelVault do use Agent def start_link(initial_value) do Agent.start_link(fn -> initial_value end, name: __MODULE__) end def value do Agent.get(__MODULE__, & &1) end def store(value) do Agent.update(__MODULE__, fn _ -> value end) end end KeyVault.start_link("") ImagesVault.start_link([]) ModelVault.start_link(:sd1_6) balance_frame = Kino.Frame.new() api_key_form = Kino.Control.form( [ api_key: Kino.Input.password("Stability API Key") ], submit: "OK" ) api_key_status = Kino.Frame.new(placeholder: false) Kino.listen(api_key_form, fn event -> case event do %{type: :submit} -> %{data: %{api_key: api_key}} = event KeyVault.store(api_key) Kino.Frame.append(api_key_status, Kino.Markdown.new("*Updated key*")) case StabilityAI.get_balance(KeyVault.value()) do {:ok, %Tesla.Env{status: 200, body: body}} -> %{"credits" => credits} = body Kino.Frame.render(balance_frame, Kino.Markdown.new("**Balance** `#{credits}`")) _ -> :error end _ -> IO.puts("Error: Not specified event for API Key form!") end end) basic_form_fields = [ prompt: Kino.Input.textarea("Prompt"), negative_prompt: Kino.Input.text("Negative Prompt"), aspect_ratio: Kino.Input.select("Aspect Ratio", [ {"21:9", "21:9"}, {"16:9", "16:9"}, {"3:2", "3:2"}, {"1:1", "1:1"}, {"2:3", "2:3"}, {"9:16", "9:16"}, {"9:21", "9:21"} ]) ] v1_form = Kino.Control.form( basic_form_fields ++ [ style_preset: Kino.Input.select("Predefined Style", [ {"3d-model", "3D Model"}, {"analog-film", "Analog Film"}, {"anime", "Anime"}, {"cinematic", "Cinematic"}, {"comic-book", "Comic Book"}, {"digital-art", "Digital Art"}, {"enhance", "Enhance"}, {"fantasy-art", "Fantasy Art"}, {"isometric", "Isometric"}, {"line-art", "Line Art"}, {"low-poly", "Low Poly"}, {"modeling-compound", "Modeling Compound"}, {"neon-punk", "Neon Punk"}, {"origami", "Origami"}, {"photographic", "Photographic"}, {"pixel-art", "Pixel Art"}, {"tile-texture", "Tile Texture"}, {nil, "None"} ]) ], submit: "Submit" ) v3_form = Kino.Control.form( basic_form_fields, submit: "Submit" ) prompt_frame = Kino.Frame.new() Kino.Frame.render(prompt_frame, v1_form) case RateLimitedForm.start_link(v1_form, []) do {:error, {:already_started, _pid}} -> :ok {:ok, _pid} -> :ok x -> x end case RateLimitedForm.start_link(v3_form, []) do {:error, {:already_started, _pid}} -> :ok {:ok, _pid} -> :ok x -> x end Kino.nothing() ``` ```elixir Kino.Layout.grid( [ api_key_form, api_key_status ], columns: 2 ) ``` ```elixir # Select model to use # model_form = # Kino.Control.form( # [ # model: # Kino.Input.select("Model", [ # {:sd1_6, "SD 1.6"}, # {:sdxl, "SDXL"}, # {:sd3, "SD 3"}, # {:sd3_turbo, "SD 3-Turbo"} # ]) # ], # submit: "Select Model" # ) model_selection = Kino.Input.select("Model", [ {:sd1_6, "SD 1.6"}, {:sdxl, "SDXL"}, {:sd3, "SD 3"}, {:sd3_turbo, "SD 3-Turbo"} ]) Kino.listen(model_selection, fn event -> %{value: model} = event ModelVault.store(model) case model do :sd1_6 -> Kino.Frame.render(prompt_frame, v1_form) :sdxl -> Kino.Frame.render(prompt_frame, v1_form) :sd3 -> Kino.Frame.render(prompt_frame, v3_form) :sd3_turbo -> Kino.Frame.render(prompt_frame, v3_form) end end) model_selection ``` ````elixir stability_status = Kino.Frame.new() prompt_status = Kino.Frame.new() preview_frame = Kino.Frame.new(placeholder: false) latest_image_file = ImagesVault.latest() case latest_image_file do nil -> nil _ -> latest_image = File.read!(latest_image_file) |> Kino.Image.new(:png) Kino.Frame.render(preview_frame, latest_image) end status_layout = Kino.Layout.grid([Kino.Markdown.new("### *Status*"), prompt_status, stability_status], columns: 1 ) form_frame = Kino.Frame.new() form_layout = Kino.Layout.grid([form_frame, status_layout], columns: 2) Kino.render(form_layout) Kino.Frame.append(form_frame, Kino.Markdown.new("## Generate Image")) Kino.Frame.append(form_frame, balance_frame) Kino.Frame.append(form_frame, prompt_frame) images = for image_path <- ImagesVault.value() do File.read!(image_path) |> Kino.Image.new(:png) end images_grid = Kino.Layout.grid(images, columns: 3) output_frame = Kino.Frame.new() Kino.Markdown.new(""" ___ Preview: """) |> Kino.render() Kino.render(preview_frame) Kino.Frame.append(output_frame, images_grid) Kino.listen(v1_form, fn event -> case event do %{type: :submit} -> %{data: data} = event model = ModelVault.value() Kino.Frame.render( prompt_status, Kino.Markdown.new(""" ```json { "prompt": #{data.prompt} "negative_prompt": #{data.negative_prompt} "aspect_ratio": #{data.aspect_ratio} "model": #{model} } ``` """) ) response = StabilityAI.throttled_submission(v1_form, data, status_frame: stability_status) StabilityAI.handle_response( response, data, status_frame: stability_status, preview_frame: preview_frame, output_frame: output_frame, balance_frame: balance_frame ) _ -> IO.puts("What?") end end) Kino.listen(v3_form, fn event -> case event do %{type: :submit} -> %{data: data} = event model = ModelVault.value() Kino.Frame.render( prompt_status, Kino.Markdown.new(""" ```json { "prompt": #{data.prompt} "negative_prompt": #{data.negative_prompt} "aspect_ratio": #{data.aspect_ratio} "model": #{model} } ``` """) ) response = StabilityAI.throttled_submission(v1_form, data, status_frame: stability_status) StabilityAI.handle_response( response, data, status_frame: stability_status, preview_frame: preview_frame, output_frame: output_frame, balance_frame: balance_frame ) _ -> IO.puts("What?") end end) Kino.Markdown.new(""" ___ """) |> Kino.render() output_frame ```` ```elixir download_button = Kino.Control.button("ZIP images") download_frame = Kino.Frame.new() Kino.render(download_button) Kino.render(download_frame) Kino.listen(download_button, fn _event -> now = DateTime.utc_now() |> DateTime.to_string() |> String.replace(~r/[:\s-]/, "") |> String.replace(".", "_") image_files = ImagesVault.value() files_to_zip = image_files |> Enum.map(fn file_path -> String.replace(file_path, StabilityAI.image_path(), "") |> String.to_charlist() end) zip_filename = "stability_images_#{now}.zip" zip_filepath = StabilityAI.image_path() <> zip_filename case :zip.create( String.to_charlist(zip_filepath), files_to_zip, cwd: String.to_charlist(StabilityAI.image_path()) ) do {:ok, _} -> download_prompt = Kino.Download.new( fn -> "#{zip_filepath}" |> File.read!() end, filename: zip_filename, label: zip_filename ) IO.puts("ZIP file created: #{zip_filename}") Kino.Frame.render(download_frame, download_prompt) x -> IO.inspect(x) IO.puts("Error creating ZIP") end end) Kino.nothing() ```