Ansible API

摘要:
可参考文章:链接ansible2.8,对于初始化有了改变,2.7使用了Python标准库里的命名元组来初始化选项,而2.8是Ansible自己封装了一个ImmutableDict,之后需要和context结合使用的。tip:pip3installansible的方式安装,ansible.cfg文件会在site-packages/ansible/galaxy/data/container/tests下。

一、环境说明

当前环境说明:

  • python3.8.6
  • ansible2.10.5

ansible版本差异说明:

  • ansible2.0版本前后有较大改变,鉴于当前已经到了2.10版本,不再过多说明历史2.0版本的变动。可参考文章:链接
  • ansible2.4,对于Inventory-->InventoryManager VariableManager类的使用,导入方式进行了改变。可参考文章:链接
  • ansible2.8,对于初始化有了改变,2.7 使用了Python标准库里的命名元组来初始化选项,而2.8是Ansible自己封装了一个ImmutableDict,之后需要和 context结合使用的。

tip:pip3 install ansible的方式安装,ansible.cfg文件会在site-packages/ansible/galaxy/data/container/tests下。

二、官方代码解析

2.1、官方示例(Ad-hoc)

1 #!/usr/bin/env python
2 
3 from __future__ import(absolute_import, division, print_function)
4 __metaclass__ =type
5 
6 importjson
7 importshutil
8 
9 importansible.constants as C
10 from ansible.executor.task_queue_manager importTaskQueueManager
11 from ansible.module_utils.common.collections importImmutableDict
12 from ansible.inventory.manager importInventoryManager
13 from ansible.parsing.dataloader importDataLoader
14 from ansible.playbook.play importPlay
15 from ansible.plugins.callback importCallbackBase
16 from ansible.vars.manager importVariableManager
17 from ansible importcontext
18 
19 
20 #Create a callback plugin so we can capture the output
21 classResultsCollectorJSONCallback(CallbackBase):
22     """A sample callback plugin used for performing an action as results come in.
23 
24 If you want to collect all results into a single object for processing at
25 the end of the execution, look into utilizing the ``json`` callback plugin
26 or writing your own custom callback plugin.
27     """
28 
29     def __init__(self, *args, **kwargs):
30         super(ResultsCollectorJSONCallback, self).__init__(*args, **kwargs)
31         self.host_ok ={}
32         self.host_unreachable ={}
33         self.host_failed ={}
34 
35     defv2_runner_on_unreachable(self, result):
36         host =result._host
37         self.host_unreachable[host.get_name()] =result
38 
39     def v2_runner_on_ok(self, result, *args, **kwargs):
40         """Print a json representation of the result.
41 
42 Also, store the result in an instance attribute for retrieval later
43         """
44         host =result._host
45         self.host_ok[host.get_name()] =result
46         print(json.dumps({host.name: result._result}, indent=4))
47 
48     def v2_runner_on_failed(self, result, *args, **kwargs):
49         host =result._host
50         self.host_failed[host.get_name()] =result
51 
52 
53 defmain():
54     host_list = ['localhost', 'www.example.com', 'www.google.com']
55     #since the API is constructed for CLI it expects certain options to always be set in the context object
56     context.CLIARGS = ImmutableDict(connection='smart', module_path=['/to/mymodules', '/usr/share/ansible'], forks=10, become=None,
57                                     become_method=None, become_user=None, check=False, diff=False)
58     #required for
59     #https://github.com/ansible/ansible/blob/devel/lib/ansible/inventory/manager.py#L204
60     sources = ','.join(host_list)
61     if len(host_list) == 1:
62         sources += ','
63 
64     #initialize needed objects
65     loader = DataLoader()  #Takes care of finding and reading yaml, json and ini files
66     passwords = dict(vault_pass='secret')
67 
68     #Instantiate our ResultsCollectorJSONCallback for handling results as they come in. Ansible expects this to be one of its main display outlets
69     results_callback =ResultsCollectorJSONCallback()
70 
71     #create inventory, use path to host config file as source or hosts in a comma separated string
72     inventory = InventoryManager(loader=loader, sources=sources)
73 
74     #variable manager takes care of merging all the different sources to give you a unified view of variables available in each context
75     variable_manager = VariableManager(loader=loader, inventory=inventory)
76 
77     #instantiate task queue manager, which takes care of forking and setting up all objects to iterate over host list and tasks
78     #IMPORTANT: This also adds library dirs paths to the module loader
79     #IMPORTANT: and so it must be initialized before calling `Play.load()`.
80     tqm =TaskQueueManager(
81         inventory=inventory,
82         variable_manager=variable_manager,
83         loader=loader,
84         passwords=passwords,
85         stdout_callback=results_callback,  #Use our custom callback instead of the ``default`` callback plugin, which prints to stdout
86 )
87 
88     #create data structure that represents our play, including tasks, this is basically what our YAML loader does internally.
89     play_source =dict(
90         name="Ansible Play",
91         hosts=host_list,
92         gather_facts='no',
93         tasks=[
94             dict(action=dict(module='shell', args='ls'), register='shell_out'),
95             dict(action=dict(module='debug', args=dict(msg='{{shell_out.stdout}}'))),
96             dict(action=dict(module='command', args=dict(cmd='/usr/bin/uptime'))),
97 ]
98 )
99 
100     #Create play object, playbook objects use .load instead of init or new methods,
101     #this will also automatically create the task objects from the info provided in play_source
102     play = Play().load(play_source, variable_manager=variable_manager, loader=loader)
103 
104     #Actually run it
105     try:
106         result = tqm.run(play)  #most interesting data for a play is actually sent to the callback's methods
107     finally:
108         #we always need to cleanup child procs and the structures we use to communicate with them
109 tqm.cleanup()
110         ifloader:
111 loader.cleanup_all_tmp_files()
112 
113     #Remove ansible tmpdir
114 shutil.rmtree(C.DEFAULT_LOCAL_TMP, True)
115 
116     print("UP ***********")
117     for host, result inresults_callback.host_ok.items():
118         print('{0} >>> {1}'.format(host, result._result['stdout']))
119 
120     print("FAILED *******")
121     for host, result inresults_callback.host_failed.items():
122         print('{0} >>> {1}'.format(host, result._result['msg']))
123 
124     print("DOWN *********")
125     for host, result inresults_callback.host_unreachable.items():
126         print('{0} >>> {1}'.format(host, result._result['msg']))
127 
128 
129 if __name__ == '__main__':
130     main()
View Code

2.2、拆分说明

  • 导入模块详解
1 #初始化ansible的配置选项,比如: 指定远程用户remote_user=None
2 from ansible.module_utils.common.collections importImmutableDict
3 #上下文管理器,他就是用来接收 ImmutableDict 的示例对象
4 from ansible importcontext
5 #管理资源库,读取yaml/json/ini格式的文件,主要用于读取inventory文件
6 from ansible.parsing.dataloader importDataLoader
7 #变量管理器,包括主机,组,扩展等变量
8 from ansible.vars.manager importVariableManager
9 #用于创建和管理inventory,导入inventory文件
10 from ansible.inventory.manager importInventoryManager
11 #用于执行 Ad-hoc 的类
12 from ansible.playbook.play importPlay
13 #ad-hoc ansible底层用到的任务队列
14 from ansible.executor.task_queue_manager importTaskQueueManager
15 #回调基类,用来定义回调事件,比如返回失败成功等信息
16 from ansible.plugins.callback importCallbackBase
17 #执行playbook
18 from ansible.executor.playbook_executor importPlaybookExecutor
19 #操作单个主机,可以给主机添加变量等操作
20 from ansible.inventory.host importHost
21 #操作单个主机组,可以给组添加变量等操作
22 from ansible.inventory.group importGroup
23 #用于获取 ansible 产生的临时文档
24 import ansible.constants as C
  • 回调类
1 classResultsCollectorJSONCallback(CallbackBase):
2     """
3 回调插件就是一个类。 将执行命令的结果放到一个对象中,一边用json或自定义插件,获取执行结果的。 我们可以改写这个类,以便满足我们查询执行结果格式的需求。
4     """
5 
6     def __init__(self, *args, **kwargs):
7         super(ResultsCollectorJSONCallback, self).__init__(*args, **kwargs)
8         self.host_ok ={}
9         self.host_unreachable ={}
10         self.host_failed ={}
11 
12     defv2_runner_on_unreachable(self, result):
13         host =result._host
14         self.host_unreachable[host.get_name()] =result
15 
16     def v2_runner_on_ok(self, result, *args, **kwargs):
17         """
18 将结果以json形式打印。indent=4 是json格式美化参数,便于阅读
19 将结果存储在实例属性中,以便稍后检索
20         """
21         host =result._host
22         self.host_ok[host.get_name()] =result
23         print(json.dumps({host.name: result._result}, indent=4))
24 
25     def v2_runner_on_failed(self, result, *args, **kwargs):
26         host =result._host
27         self.host_failed[host.get_name()] = result
  • 初始化选项和source
1 #构建一些初始化选项,在context对象中被设置
2 context.CLIARGS = ImmutableDict(connection='smart', module_path=['/to/mymodules', '/usr/share/ansible'], forks=10, become=None,
3                                 become_method=None, become_user=None, check=False, diff=False)
4 #source指inventory文件数据,默认指/etc/ansible/hosts,也可读取yaml/json/ini格式的文件,或者主机名以逗号隔开的字符串。
5 host_list = ['localhost', 'www.example.com', 'www.google.com']
6 sources = ','.join(host_list)
7 if len(host_list) == 1:
8     sources += ','
  • 实例化对象
1 #初始化所需的对象集
2 #管理资源库,读取yaml/json/ini格式的文件,主要用于读取inventory文件
3 loader =DataLoader()
4 #密码这里是必须使用的一个参数,假如通过公钥信任,也可以给一个空字典:dict()
5 passwords = dict(vault_pass='secret')
6 
7 #回调对象,用于查询执行结果
8 results_callback =ResultsCollectorJSONCallback()
9 
10 #资源管理器,创建inventory对象,引入上面的source。
11 #也可以:sources='/etc/ansible/hosts'。也可以:sources='host1,host2,...'
12 inventory = InventoryManager(loader=loader, sources=sources)
13 
14 #变量管理器,管理变量
15 variable_manager = VariableManager(loader=loader, inventory=inventory)
  • 任务队列管理器
1 #实例化任务队列管理器
2 #tip: 会加载库目录到loader里
3 #tip: 必须在`Play.load()`之前初始化队列实例
4 tqm =TaskQueueManager(
5     inventory=inventory,
6     variable_manager=variable_manager,
7     loader=loader,
8     passwords=passwords,
9     stdout_callback=results_callback,  #回调实例
10 )
  • ansible命令模块和参数
1 play_source =dict(
2     name="Ansible Play",  #名字,只未方便阅读
3     hosts=host_list,      #执行主机列表
4     gather_facts='no',
5     tasks=[
6         dict(action=dict(module='shell', args='ls'), register='shell_out'),
7         dict(action=dict(module='debug', args=dict(msg='{{shell_out.stdout}}'))),
8         dict(action=dict(module='command', args=dict(cmd='/usr/bin/uptime'))),
9 ]
10 )
  • 执行
1 #定义play对象
2 play = Play().load(play_source, variable_manager=variable_manager, loader=loader)
3 
4 #在队列中执行play对象
5 try:
6     result = tqm.run(play)  #执行的结果返回码,成功是 0
7 finally:
8     #如果 `tqm` 不是 `None`, 需要清理子进程和我们用来与它们通信的结构。
9 tqm.cleanup()
10     ifloader:
11 loader.cleanup_all_tmp_files()
12 
13 #最后删除 ansible 产生的临时目录
14 #这个临时目录会在 ~/.ansible/tmp/ 目录下
15 shutil.rmtree(C.DEFAULT_LOCAL_TMP, True)
  • 查询结果
1 print("UP ***********")
2 for host, result inresults_callback.host_ok.items():
3     print('{0} >>> {1}'.format(host, result._result['stdout']))
4 
5 print("FAILED *******")
6 for host, result inresults_callback.host_failed.items():
7     print('{0} >>> {1}'.format(host, result._result['msg']))
8 
9 print("DOWN *********")
10 for host, result inresults_callback.host_unreachable.items():
11     print('{0} >>> {1}'.format(host, result._result['msg']))

2.3、调用playbook

1 from ansible.executor.playbook_executor importPlaybookExecutor
2 
3 playbook = PlaybookExecutor(playbooks=['/root/test.yml'],  #tip:这是列表
4                  inventory=inventory,
5                  variable_manager=variable_manager,
6                  loader=loader,
7                  passwords=passwords)
8 
9 #使用回调函数
10 playbook._tqm._stdout_callback =results_callback
11 
12 result =playbook.run()
13 
14 for host, result inresults_callback.host_ok.items():
15     print("主机{}, 执行结果{}".format(host, result._result['result']['stdout'])

三、二次开发

1 #-*- coding:utf-8 -*-
2 importjson
3 importshutil
4 from ansible.module_utils.common.collections importImmutableDict
5 from ansible importcontext
6 from ansible.parsing.dataloader importDataLoader
7 from ansible.vars.manager importVariableManager
8 from ansible.inventory.manager importInventoryManager
9 from ansible.playbook.play importPlay
10 from ansible.executor.playbook_executor importPlaybookExecutor
11 from ansible.executor.task_queue_manager importTaskQueueManager
12 from ansible.plugins.callback importCallbackBase
13 importansible.constants as C
14 
15 
16 classResultCallback(CallbackBase):
17     """
18 重写callbackBase类的部分方法
19     """
20 
21     def __init__(self, *args, **kwargs):
22         #python3支持这样的语法
23         #super().__init__(*args, **kwargs)
24         #python2要用这样的语法
25         super(ResultCallback, self).__init__(*args, **kwargs)
26         self.host_ok ={}
27         self.host_unreachable ={}
28         self.host_failed ={}
29         #self.task_ok = {}
30 
31     defv2_runner_on_unreachable(self, result):
32         self.host_unreachable[result._host.get_name()] =result
33 
34     def v2_runner_on_ok(self, result, **kwargs):
35         self.host_ok[result._host.get_name()] =result
36 
37     def v2_runner_on_failed(self, result, **kwargs):
38         self.host_failed[result._host.get_name()] =result
39 
40 
41 classMyAnsiable():
42     def __init__(self,
43                  connection='local',
44                  module_path=None,
45                  remote_user=None,
46                  ack_pass=None,
47                  sudo=None,
48                  sudo_user=None,
49                  ask_sudo_pass=None,
50                  become=None,
51                  become_method=None,
52                  become_user=None,
53                  listhosts=None,
54                  listtasks=None,
55                  listtags=None,
56                  verbosity=3,
57                  syntax=None,
58                  start_at_task=None,
59                  check=False,
60                  diff=False,
61                  inventory=None,
62                  passwords=None):
63         """
64 初始化函数,定义的默认的选项值,
65 在初始化的时候可以传参,以便覆盖默认选项的值
66         """
67         context.CLIARGS =ImmutableDict(
68             connection=connection,
69             module_path=module_path,
70             remote_user=remote_user,
71             ack_pass=ack_pass,
72             sudo=sudo,
73             sudo_user=sudo_user,
74             ask_sudo_pass=ask_sudo_pass,
75             become=become,
76             become_method=become_method,
77             become_user=become_user,
78             listhosts=listhosts,
79             listtasks=listtasks,
80             listtags=listtags,
81             verbosity=verbosity,
82             syntax=syntax,
83             start_at_task=start_at_task,
84             check=check,
85             diff=diff
86 )
87 
88         #三元表达式,假如没有传递 inventory, 就使用 "localhost,"
89         #确定 inventory 文件
90         self.inventory = inventory if inventory else "localhost,"
91 
92         #三元表达式,假如没有传递 passwords, 就使用 {}
93         #设置密码,可以为空字典,但必须有此参数
94         self.passwords = passwords if passwords else{}
95 
96         #实例化数据解析器
97         self.loader =DataLoader()
98 
99         #实例化 资产配置对象
100         self.inventory_manager_obj = InventoryManager(loader=self.loader, sources=self.inventory)
101 
102         #变量管理器
103         self.variable_manager_obj =VariableManager(self.loader, self.inventory_manager_obj)
104 
105         #实例化回调插件对象
106         self.results_callback =ResultCallback()
107 
108     def adhoc(self, name='Ad-hoc', hosts='localhost', gather_facts="no", module="ping", args=''):
109         play_source =dict(
110             name=name,
111             hosts=hosts,
112             gather_facts=gather_facts,
113             tasks=[
114                 #这里每个 task 就是这个列表中的一个元素,格式是嵌套的字典
115                 #也可以作为参数传递过来,这里就简单化了。
116                 {"action": {"module": module, "args": args}},
117 ])
118 
119         play = Play().load(play_source, variable_manager=self.variable_manager_obj, loader=self.loader)
120 
121         tqm =None
122         try:
123             tqm =TaskQueueManager(
124                 inventory=self.inventory_manager_obj,
125                 variable_manager=self.variable_manager_obj,
126                 loader=self.loader,
127                 passwords=self.passwords,
128                 stdout_callback=self.results_callback
129 )
130             result =tqm.run(play)
131         finally:
132             if tqm is notNone:
133 tqm.cleanup()
134 shutil.rmtree(C.DEFAULT_LOCAL_TMP, True)
135 
136     defplaybook(self, playbooks):
137         playbook = PlaybookExecutor(playbooks=playbooks,  #Tip: 这里是一个列表
138                                     inventory=self.inventory_manager_obj,
139                                     variable_manager=self.variable_manager_obj,
140                                     loader=self.loader,
141                                     passwords=self.passwords
142 )
143 
144         #使用回调函数
145         playbook._tqm._stdout_callback =self.results_callback
146         #执行
147         result =playbook.run()
148 
149     defget_result(self):
150         result_raw = {'success': {}, 'failed': {}, 'unreachable': {}}
151         #print(self.results_callback.host_ok)
152         for host, result inself.results_callback.host_ok.items():
153             result_raw['success'][host] =result._result
154 
155         #print(self.results_callback.host_failed)
156         for host, result inself.results_callback.host_failed.items():
157             result_raw['failed'][host] =result._result
158 
159         #print(self.results_callback.host_unreachable)
160         for host, result inself.results_callback.host_unreachable.items():
161             result_raw['unreachable'][host] =result._result
162 
163         #打印结果,并且使用 JSON 格式化
164         print(json.dumps(result_raw, indent=4))
View Code

四、运行示例

4.1、执行默认ad-hoc

#-*- coding:utf-8 -*-
from MyAnsible importMyAnsiable

ansible_obj =MyAnsiable()
ansible_obj.adhoc()
ansible_obj.get_result()

结果:

{
    "success": {
        "localhost": {
            "ping": "pong",
            "invocation": {
                "module_args": {
                    "data": "pong"}
            },
            "ansible_facts": {
                "discovered_interpreter_python": "/usr/bin/python"},
            "_ansible_no_log": false,
            "changed": false
        }
    },
    "failed": {},
    "unreachable": {}
}
View Code

4.2、远程执行命令

#文件名: /root/ansible-playbook/api/hosts
[tapi]
10.96.0.59

#文件名: ansible03.py#-*- coding:utf-8 -*-
from MyAnsible importMyAnsiable

#配置资产配置文件,并使用 ssh 的远程连接方式
ansible_obj = MyAnsiable(inventory='/root/ansible-playbook/api/hosts', connection='smart')
#执行对象是hosts里的组名
ansible_obj.adhoc(hosts='tapi', module='shell', args='ip a |grep "inet"')
ansible_obj.get_result()

结果:

{
    "failed": {}, 
    "success": {
        "10.96.0.59": {
            "stderr_lines": [], 
            "cmd": "ip a |grep \"inet\"", 
            "end": "2021-01-28 15:41:25.972706", 
            "_ansible_no_log": false, 
            "stdout": "inet 127.0.0.1/8 scope host lo\n    inet 10.96.0.59/24 brd 10.96.0.255 scope global dynamic eth0", 
            "changed": true, 
            "rc": 0, 
            "start": "2021-01-28 15:41:25.960580", 
            "stderr": "", 
            "delta": "0:00:00.012126", 
            "invocation": {
                "module_args": {
                    "creates": null, 
                    "executable": null, 
                    "_uses_shell": true, 
                    "strip_empty_ends": true, 
                    "_raw_params": "ip a |grep \"inet\"", 
                    "removes": null, 
                    "argv": null, 
                    "warn": true, 
                    "chdir": null, 
                    "stdin_add_newline": true, 
                    "stdin": null
                }
            }, 
            "stdout_lines": [
                "inet 127.0.0.1/8 scope host lo", 
                "inet 10.96.0.59/24 brd 10.96.0.255 scope global dynamic eth0"], 
            "ansible_facts": {
                "discovered_interpreter_python": "/usr/bin/python"}
        }
    }, 
    "unreachable": {}
}
View Code

4.3、调用playbook

研究过程中,发现不能传递额外的系统变量, 后面实例懒的写了,看完我前面的代码,执行playbook应该不成问题。

参考资料

  • 二开:https://zhuanlan.zhihu.com/p/118411015
  • http://www.linuxboy.net/ansiblejc/143275.html
  • ansible2.4版本前后模块差异:https://blog.csdn.net/qq_29832217/article/details/97005626
  • 常用模块使用:https://blog.csdn.net/yyy72999/article/details/81184720
  • 官网2.7 https://docs.ansible.com/ansible/2.7/dev_guide/developing_api.html
  • 官网2.8https://docs.ansible.com/ansible/2.8/dev_guide/developing_api.html
  • 官网2.10 https://docs.ansible.com/ansible/latest/dev_guide/developing_api.html

免责声明:文章转载自《Ansible API》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇股票涨跌和买卖预测计算公式Linux基础学习 | gcc、g++的安装和使用下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

zabbix详解(一)

zabbix简介 zabbix是一个基于WEB界面的提供分布式系统监视以及网络监视功能的企业级的开源解决方案。 zabbix能监视各种网络参数,保证服务器系统的安全运营;并提供柔软的通知机制以让系统管理员快速定位/解决存在的各种问题。 zabbix由3部分构成,zabbixserver、可选组件zabbix proxy、可选组件zabbix agent。...

ubuntu mysql允许root用户远程登录

有两种方法: 一、 1、mysql>GRANT ALL PRIVILEGES ON *.*TO 'root'@'%'IDENTIFIED BY '123456'WITH GRANT OPTION; 2、mysql>FLUSH PRIVILEGES; 二、 1、mysql>use mysql; 2、mysql>update user...

rabbitmq消费端加入精确控频。

控制频率之前用的是线程池的数量来控制,很难控制。因为做一键事情,做一万次,并不是每次消耗的时间都相同,所以很难推测出到底多少线程并发才刚好不超过指定的频率。 现在在框架中加入控频功能,即使开200线程,也能保证1秒钟只运行10次任务。 里面的rabbitpy后来加的,然来是使用pika的,就框架本身得实现写法上违反了开闭原则,没设计太好。好在不影响调用方式...

六.ansible批量管理服务

期中集群架构-第六章-ansible批量管理服务介绍====================================================================== 01. 批量管理服务知识介绍a. ansible是一个基于Python开发的自动化运维工具b. ansible是一个基于ssh协议实现远程管理的工具c. ansib...

docke网络之bridge、host、none

一、bridge网络 1.创建一个测试容器 [root@localhost ~]# docker run -d -it --name busybox_1 busybox /bin/sh -c "while true;do sleep 3600;done" 03b308c847edd23f21ba69afb825d92f7aaeb05b1ff4431dd47...

Ansible自动化运维笔记3(playbook)

1.基本语法### playbook文件格式为yaml语法.示例如下: 1.1 nginx.yaml --- - hosts: all tasks: - name: Install Nginx Package yum: name=nginx state=present - name: Copy Nginx.co...